opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

[WIP] PPL geoip function pt. 2 #871

Open kenrickyap opened 2 weeks ago

kenrickyap commented 2 weeks ago

Description PPL geoip function

Issues Resolved https://github.com/opensearch-project/opensearch-spark/issues/672

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

This PR is a continuation of https://github.com/opensearch-project/opensearch-spark/pull/781 due to lacking permissions to push to forked branch in said PR

kenrickyap commented 2 weeks ago

Hi @YANG-DB, I heard from Anas that you had a method of implementing ip2geo functionality for Spark. I wanted to check with you that our current approach aligns with your method.

Current Plan: Leveraging SerializableUdf create a UDF that does the follow:

  1. Check if in-memory cache object for datasource exists.
  2. If cache object does not exists create new in-memory cache object from csv retrieved from datasource manifest. (manifest to CsvParser logic can be stripped from geospatial ip2geo).
  3. Search cached object for GeoIP data.
  4. Return GeoIP data.

This PR has a stub udf implementation for better idea of how this would be implemented.

All of this would have to be implemented within the Spark library, as currently I am not aware of how to access any geospatial artifacts. If you know of a better way to implement ip2geo please let me know! Thanks!

YANG-DB commented 2 weeks ago

@kenrickyap please update the DCO (contributor sign-off)

kenrickyap commented 2 weeks ago

Flow diagram for ip2geo UDF

mermaid-diagram-2024-11-06-140042

Design Details

YANG-DB commented 2 weeks ago

Flow diagram for ip2geo UDF

mermaid-diagram-2024-11-06-140042

Design Details

  • Ip2GeoCache will be a Gauva Cache that has datasource string as key and CidrTree as value
  • CidrTree will be Trie (will use apache.common PatriciaTrie as I see that apache.common is already included in project)

    • Am using trie instead of map as it is more well suited for longest prefix matching task, and this task is similar to ip to cidr matching

    • each CidrTreeNode will store:

    • nth bit value of cidr (n is the depth of tree)

    • geo_data if there is matching cidr row in datasource csv

    • child CidrTreeNodes

  • Will retrieve CsvParser from manifest using similar methodology as in geospatial ip2geo

Pros

  • ip2geo functionality is achieved.
  • implementation is simple and does not depend on any additional libraries that do not already exist in the project.

Cons

  • calculations are done in-memory as a UDF, this means that multiple instances of Ip2GeoCache will be created in distributed Spark systems and they will not sync.
  • not leveraging job-scheduler to run ip2geo task and not leveraging OpenSearch to store ip2geo data as geospatial does.

@kenrickyap a few questions here:

kenrickyap commented 2 weeks ago

@kenrickyap a few questions here:

  • how / is the Ip2GeoCache memory shared between sessions ? is it a signleton ?

Yes we will create Ip2GeoCache to be a singleton. From my understanding this should allow the cache to be accessible between sessions

  • plz also add the use case where the geo_data_source will be a table / index (in OpenSearch) / service (API) - lets create a general purpose facade for this to hide different datasource drivers

Will update the flow diagram to reflect facade for different datasource.

However, would it be possible to provide a service (API) example usage? I am not to sure what such a datasource would be expected to return.

Also would there be a fixed schema for an index in opensource?

  • I would like to see a more detailed description of the CidrTreeNode including an example and simple high level explanations (the geo tree diagram ?)

In hindsight I will just use a hashmap to store cidr geo_data where the cidr bitstring will be the key and geo_data will be a value. To preform the ip cidr matching will implement lookup function to convert ip to bitstring and reduce bit length till key is found in map.

Initially I wanted to leverage the prefixMap function as I thought this would find the best fitting cidr mask for a given ip in O(1), which would mean I wouldn't have to implement my own lookup function. However as I was trying implement this noticed that prefixMap takes a prefix and finds all keys that have the prefix, which is the opposite with what I want.

  • explain why is it worth while to add this trie-tree instead of using a hash-map / search ?

As mentioned above will use a hash-map instead of trie.

  • can u also add a pseudo-code here (in the issue) to clarity the composition ?

Will have pseudo-code and code added by EOD

kenrickyap commented 1 week ago

@kenrickyap can u plz update the PR status and progress ? thanks

@YANG-DB we are nearly done implementation, am currently testing implementation of geoip functionality on TestDatasourceDao that provides a mock stream of geo data.

@jduo is implementing the manifest dao, in parallel

Also would it be ok to move API and OpenSearch index DAO implementation to another ticket? We are not sure what the specifications for this would be as there are no examples, and am not sure this would be done by Nov 15th.

YANG-DB commented 1 week ago

@kenrickyap plz fix the DCO error ...

YANG-DB commented 1 week ago

@LantaoJin I would appreciate u'r review here as well...

qianheng-aws commented 1 week ago
  • Ip2GeoCache will be a Gauva Cache that has datasource string as key and CidrTree as value

@kenrickyap questions about the design:

  1. The UDF will only be executed on Spark Executor. So the cache won't take effect actually, it will be destroyed once the task has finished. Do we have mechanism to avoid this?
  2. The CSV data is nearly 500MB for the datasource of https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json, I guess it will get bigger if populating a trie tree on it. I think it will be a big memory pressure for Spark Executor.
  3. Have we ever considered retrieving the geo data by calling the related API of service like OpenSearch geospatial? Then we don't have to load the above CSV in executor memory. But I'm not quite sure whether it's feasible to do that on Serverless Spark.
kenrickyap commented 1 week ago
  • Ip2GeoCache will be a Gauva Cache that has datasource string as key and CidrTree as value

@kenrickyap questions about the design:

  1. The UDF will only be executed on Spark Executor. So the cache won't take effect actually, it will be destroyed once the task has finished. Do we have mechanism to avoid this?

The JVM will persist on the worker node, so udf tasks that are carried out in the same worker node will be able to leverage the cache. In any case from above comments we will be shifting away from using a singleton and implement some other method of persisting queried results.

  1. The CSV data is nearly 500MB for the datasource of https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json, I guess it will get bigger if populating a trie tree on it. I think it will be a big memory pressure for Spark Executor.

yes this will be the case, this would be an initial method of performing geo2ip, there is plan to add less expensive methods of performing operation in future.

  1. Have we ever considered retrieving the geo data by calling the related API of service like OpenSearch geospatial? Then we don't have to load the above CSV in executor memory. But I'm not quite sure whether it's feasible to do that on Serverless Spark.

yes the initial plan was to use the geospatial plugin, but geospatial does not publish any jars, and I could not find a way to leverage it's functionality for Spark. A client jar for the plugin is currently being created, a plan is discussed here: https://github.com/opensearch-project/sql/issues/3038. The plan was to have a follow up task where we leverage the use of this client, when it is implemented, however as there was an urgent ask for this functionality, we opted to implement an in-engine method in parallel.

There has also been ask to be able to retrieve data from OpenSearch Index and general APIs, these methods will also be added in future issue

LantaoJin commented 1 week ago
  1. The UDF will only be executed on Spark Executor. So the cache won't take effect actually, it will be destroyed once the task has finished. Do we have mechanism to avoid this?

The JVM will persist on the worker node, so udf tasks that are carried out in the same worker node will be able to leverage the cache.

Not exactly. It depends on how the user deploys Spark. In most cases, when the dynamic resource allocation enabled, when an executor is idle for a while, e.g. no active task is running on this executor in 60 seconds, the executor process (JVM) will be released. The cache won't take effect any more.

  1. The CSV data is nearly 500MB for the datasource of https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json, I guess it will get bigger if populating a trie tree on it. I think it will be a big memory pressure for Spark Executor.

yes this will be the case, this would be an initial method of performing geo2ip, there is plan to add less expensive methods of performing operation in future.

What's the plan? I can't agree this initial method where each executor hosts huge memory and the cache doesn't take effect. At least the memory/cache should be built in a lazy mode, for users who doesn't use this geoip function.

LantaoJin commented 1 week ago

My thought is that from the perspective of open-source project, we only need to provide an interface for using IP services and a configuration-based data source connector, but do not actually provide specific IP lookup services. Users need to configure or deploy IP data source services that they can use. For example, users need to complete the configuration and authorization of web services, such as https://dev.maxmind.com/geoip/geolocate-an-ip/web-services/ For Amazon OpenSearch Service (AOS), an internal service for ip lookup should be deployed.

Another thought is that we create two tables (IPv4 and IPv6) in Glue as builtin tables. Each IP lookup can be converted to a Join operator with one of the IP table.

YANG-DB commented 1 week ago

My thought is that from the perspective of open-source project, we only need to provide an interface for using IP services and a configuration-based data source connector, but do not actually provide specific IP lookup services. Users need to configure or deploy IP data source services that they can use. For example, users need to complete the configuration and authorization of web services, such as https://dev.maxmind.com/geoip/geolocate-an-ip/web-services/ For Amazon OpenSearch Service (AOS), an internal service for ip lookup should be deployed.

Another thought is that we create two tables (IPv4 and IPv6) in Glue as builtin tables. Each IP lookup can be converted to a Join operator with one of the IP table.

I agree with @LantaoJin about the interface approach - but we can definitely support a default in-memory implementation for this to work out of the box

kenrickyap commented 1 week ago

GeoIP Implementation Options and Path Forward

Current Status The in-engine implementation of GeoIP functionality has core features working, with remaining work focused on tests, parameter filtering, and caching improvements.

Implementation Options

Option 1: In-Engine Implementation

Approach: Perform IP lookups within the engine using in-memory processing

Pros:

Cons:

Option 2: External Service Integration

Approach: Offload IP lookups to external services or database

Pros:

Cons:

Technical Implementation Details

In-Engine Implementation

External Service Implementation

Proposed Approach

We’re considering two potential paths:

  1. Phased Implementation:
    • Implement in-engine solution first
    • Add external service support later
    • Allows for gradual transition
  2. Direct External Implementation:
    • Focus directly on external service integration
    • Skip in-engine implementation
    • Faster path to scalable solution

Next Steps

  1. Complete current PR to release-ready state
  2. Create RFC for API interface design
  3. Present both options to community for feedback
  4. Proceed based on community consensus

Discussion Points

Seeking Feedback On

kenrickyap commented 1 week ago

@YANG-DB question regarding key needed for user session caching, would this be information obtainable from the Spark instance? Was looking through the environment variables and saw hadoop.http.staticuser.user, would this or some other spark env param be suitable? Another option is to have the user input some configuration on Spark start up.

YANG-DB commented 1 week ago

@YANG-DB question regarding key needed for user session caching, would this be information obtainable from the Spark instance? Was looking through the environment variables and saw hadoop.http.staticuser.user, would this or some other spark env param be suitable? Another option is to have the user input some configuration on Spark start up.

@kenrickyap we need to validate this with different spark providers such as the local out of the box spark / EMR managed spark and such...

kenrickyap commented 1 week ago

Based on discussion with @YANG-DB. Will shift implementation to only support Spark tables (with intention to support other datasources in future), where task of creating lookup table will be pushed to user. This shift will mean that there is no longer any reliance on UDF and performing ip2geo lookup in memory.

Specifically:

We will provide ability to use other datasources for ip lookup in future tasks.

LantaoJin commented 6 days ago

Based on discussion with @YANG-DB. Will shift implementation to only support Spark tables (with intention to support other datasources in future), where task of creating lookup table will be pushed to user. This shift will mean that there is no longer any reliance on UDF and performing ip2geo lookup in memory.

Specifically:

  • geoip default behaviour is to perform join on predefined spark table. (Table name will be provided as config during spark startup.)
  • pre-defined table schema is expected for geoip function to work. This will be documented.
  • part of PR is to provide user with instructions to create spark geoip table.

We will provide ability to use other datasources for ip lookup in future tasks.

Sounds good. I like this solution but it's still very challenging! The primary problem to solve in this solution is how to build a Partitioner(Trie tree partitioner?). HashPartitioner doesn't work with CIDR notation. For example 1.0.1.0/24 includes all listings from 1.0.1.1 to 1.0.1.255, or we have to expand all IPv4 and IPv6 listings to a spark table. PS: Expand all IPv4 will generate 4 billion listings and 1.84 × 10^19 for IPv6

normanj-bitquill commented 6 days ago

Based on discussion with @YANG-DB. Will shift implementation to only support Spark tables (with intention to support other datasources in future), where task of creating lookup table will be pushed to user. This shift will mean that there is no longer any reliance on UDF and performing ip2geo lookup in memory. Specifically:

  • geoip default behaviour is to perform join on predefined spark table. (Table name will be provided as config during spark startup.)
  • pre-defined table schema is expected for geoip function to work. This will be documented.
  • part of PR is to provide user with instructions to create spark geoip table.

We will provide ability to use other datasources for ip lookup in future tasks.

Sounds good. I like this solution but it's still very challenging! The primary problem to solve in this solution is how to build a Partitioner(Trie tree partitioner?). HashPartitioner doesn't work with CIDR notation. For example 1.0.1.0/24 includes all listings from 1.0.1.1 to 1.0.1.255, or we have to expand all IPv4 and IPv6 listings to a spark table. PS: Expand all IPv4 will generate 4 billion listings and 1.84 × 10^19 for IPv6

@LantaoJin A subnet can be represented as an integer range. For example 192.168.0.0/24 is:

>= 3,232,235,520 (11000000 10101000 00000000 00000000)
<= 3,232,235,775 (11000000 10101000 00000000 11111111)

Can a partitioner work with integer ranges?

kenrickyap commented 6 days ago

@LantaoJin @YANG-DB This is the current details for the new implementation. Please let us know if there is any concerns or comments :)

Feature will include two parts:

Details on Scala Script

User flow

Details of geoip implementation

geoip will have 3 inputs:

ip2geo lookup details

To perform ip2geo look up we will perform join on geoip_table with the following conditions:

Implementation clarification questions

YANG-DB commented 6 days ago

@LantaoJin @YANG-DB This is the current details for the new implementation. Please let us know if there is any concerns or comments :)

Feature will include two parts:

  • Scala script provided in docs that user will run to download data and create spark table.
  • geoip function that will perform join on specified table.

Details on Scala Script

User flow

  • user will run script on spark-shell
  • script will:

    1. download manifest from https://geoip.maps.opensearch.org/v1/geolite2-city/manifest.json and retrieve csv.
    2. parse csv and create spark dataframe.
    3. create new spark table geoip_table from dataframe.
    • table will have the following schema: cidr, country_iso_code ,country_name, continent_name, region_iso_code, region_name, city_name, time_zone, location
    1. add following columns to geoip_table:
    • ip_range_start - type: long - first ip address in CIDR range (will ensure range is distinct and does not overlap)
    • ip_range_end - type: long - last ip address in CIDR range (will ensure range is distinct and does not overlap)
    • ip_type - type: string - IPv4 or IPv6

Details of geoip implementation

geoip will have 3 inputs:

  • datasource - currently unused (will keep for future feature of letting users query API/Opensearch index datasources).
  • ip_address - ip_address to get geo_info on
  • parameters - which columns will be selected from geoip_table.

ip2geo lookup details

To perform ip2geo look up we will perform join on geoip_table with the following conditions:

  • geoip_table.ip_start_end <= ip_address <= geoip_table.ip_range_end

    • geo_table name will be provided at spark start up as config (e.g. --conf spark.geoip.tablename = geoip_table)
  • ip_address.type = ip_type (will check IP type using inet library)

Implementation clarification questions

  • Regarding AWS EMR Spark, will the shell script be able to download manifest?

    • If not we can alter the script such that user needs to provide filepath to script, and offload csv retrieval to user
  • Regarding @LantaoJin's suggestions to use Partitioner, I am not to sure how to integrate this given the current design. Currently my guess is that each node of the partition would be a different sub range, and we would query sub range directly to reduce load on join? Is this the right interpretation or am I missing something?

Hi @kenrickyap Thanks for the detailed information 👍 here are my comments:

YANG-DB commented 6 days ago

@kenrickyap another request - please run explain cost | ip2geo ... to see how the query cost is estimated by the engine with relation to the amount of actual rows..

kenrickyap commented 6 days ago

Thanks for the feedback :) have updated design to address comments. As for question responses:

  • what is the expected size of this table ? (rows / memory) - plz test and verify

Table has about 5 million rows, and is about 500 MB.

  • please run explain cost | ip2geo ... to see how the query cost is estimated by the engine with relation to the amount of actual rows..

Will verify this when the command is implemented

LantaoJin commented 5 days ago

ip2geo lookup details

To perform ip2geo look up we will perform join on geoip_table with the following conditions:

  • geoip_table.ip_start_end <= ip_address <= geoip_table.ip_range_end

  • Regarding @LantaoJin's suggestions to use Partitioner, I am not to sure how to integrate this given the current design.

The integer range from ip_start_start to ip_start_end between each CIDR notation should be ordering and no overlapping. Since Spark has a RangePartitioning default implementation, the Join operator with join condition geoip_table.ip_start_start <= ip_address AND ip_address <= geoip_table.ip_range_end would perform RangeJoin in optimal. But I realized there is no range join implementation in Apache Spark and EMR Spark currently. (Databricks Runtime has supported it https://docs.databricks.com/en/optimizations/range-join.html). So the join operator will perform NestedLoopJoin finally. So I think the performance won't be good but no necessary to consider how to leverage RangePartitioning in right now.

kenrickyap commented 6 hours ago

Hi @YANG-DB regarding the data-loading script to create the geoip table. we have some clarification questions regarding EMR.

If the above is not possible we can make an alternate script for EMR users that will format the csv to the correct schema then the users will upload this file to s3 instead.

YANG-DB commented 5 hours ago

Hi @YANG-DB regarding the data-loading script to create the geoip table. we have some clarification questions regarding EMR.

  • Since the script is a Scala shell script it would require the spark shell to run, does EMR have a console that can interact with the spark instance?
  • Current understanding is that users will upload a csv with the geoip data to an s3 bucket accessible to EMR, would the script be able to access this csv using some file path?

If the above is not possible we can make an alternate script for EMR users that will format the csv to the correct schema then the users will upload this file to s3 instead.

@kenrickyap I would like to see a solution that addresses both: