mongodb-labs / mongo-arrow

MongoDB integrations for Apache Arrow. Export MongoDB documents to numpy array, parquet files, and pandas dataframes in one line of code.
https://mongo-arrow.readthedocs.io
Apache License 2.0
92 stars 14 forks source link

Schema questions for pymongoarrow when converting from pymongo #239

Open covatic-john opened 1 month ago

covatic-john commented 1 month ago

we have 3 mongo collections which hold a permission object all 3 slightly different in structure.

"permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "pushNotifications": "always",
                        "location": "foreground"
                    }
                ],
                "permissions": {
                    "geolocation": "prompt"
                },
                "permissions": [
                    {
                        "activity": "never"
                    },
                    {
                        "location": "foreground"
                    },
                    {
                        "pushNotifications": "always"
                    }
                ],

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype())

then using Fastparquet as the engine write to parquet with the output like this

[{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

I am having issues when converting to use pymongoarrow. if I set the schema object as "permissions": pa.list_(pa.string()), then I get null/None, I have tried using ps.struct but then get empty values for the items that are missing in the structure.

currently my project in my query is

'permissions': {
                    '$map': {
                        'input': '$os.permissions',
                        'as': 'permission',
                        'in': {
                            '$function': {
                                'body': 'function(perm) { return JSON.stringify(perm); }',
                                'args': [
                                    '$$permission'
                                ],
                                'lang': 'js'
                            }
                        }
                    }
                },

with a schema element of "permissions": pa.list_(pa.string()), but then need to convert the column with df['permissions'] = df['permissions'].apply(list).astype(str).str.replace("'", "").str.replace('"', "'")

there must be an easier way to deal with these json objects as string. ultimately these are ending up in Redshift so can be parsed in queries. Any help or suggestions for something I thought would be quite simple.

3 days messing with mongo data and converting a migration to pymongoarrow. the other collections have been a breeze and the memory consumption has come down and have a speed improvement.

John

aclark4life commented 1 month ago

Thank you for the question! Tracking here: https://jira.mongodb.org/browse/ARROW-253

aclark4life commented 1 month ago

in pymongo I could just project the object and then convert the pandas column into string df[c] = df[c].astype(pd.StringDtype()) then using Fastparquet as the engine write to parquet with the output like this [{'location': 'notRequested'}, {'activity': 'never'}, {'pushNotifications': 'never'}, {'backgroundAuthStatus': 'permitted'}, {'att': 'denied'}, {'isPrecise': 'notRequested'}, {'adPersonalisation': 'true'}]

@covatic-john Can you please explain in more detail how you are doing the transformation with PyMongo described above? Thank you

covatic-john commented 1 month ago

morning,

my original pymongo query is

    query = [
        {
            "$match": {
                "_id": {
                    "$gte": ObjectId.from_datetime(start_date),
                    "$lt": ObjectId.from_datetime(start_date + timedelta(hours=12)),
                },
                "client_id": ObjectId(client_id),
            }
        },
        {"$addFields": {"domain": {"$arrayElemAt": [{"$split": ["$data.href", "/"]}, 2]}}},
        {
            "$project": {
                "_id": 0,
                "analytics_id": "$_id",
                "framework_id": 1,
                "client_id": 1,
                "domain": "$domain",
                "primary_directory": "$secondary",
                "app_version": "$originator.app_version",
                "report_date": "$data.timestamp.ts",
                "collation_date": "$data.timestamp.ts",
                "os": "$inferred.os",
                "acorn_code": "$data.home.acorn_code",
                "lives_in": "$data.home.lives_in",
                "permissions": "$data.permissions",
               ....
            }
        },
    ]
    analytics_list = list(db.browser_device_data.aggregate(query, batchSize=5000, allowDiskUse=True))
    return analytics_list

the before writing to parquet file I just converted the dataframe columns to the correct types I wanted in the parquet before writing out.

def convert_df_types(df):
    """Convert DataFrame columns to the appropriate data types."""
    column_names = list(df.columns.values)
    columns_not_string = [
        "collation_date",
        "framework_key",
        "client_key",
        "brand_key",
        "home_fallback",
        "acorn_code",
        "sei_version",
        "memory",
        "screen_width",
        "screen_height",
        "total_consumption_count",
        "total_consumption_duration",
    ]
    # Create sets of a,b
    setA = set(column_names)
    setB = set(columns_not_string)
    # Get new set with elements that are only in a, but not in b
    onlyInA = setA.difference(setB)

    if "collation_date" in df.columns:
        df["collation_date"] = pd.to_datetime(df["collation_date"], errors="coerce", utc=True)
    if "client_key" in df.columns:
        df["client_key"] = df["client_key"].astype("Int64")
    if "brand_key" in df.columns:
        df["brand_key"] = df["brand_key"].astype("Int64")
    if "home_fallback" in df.columns:
        df["home_fallback"] = df["home_fallback"].astype("Int64")
    if "acorn_code" in df.columns:
        df["acorn_code"] = df["acorn_code"].astype("Int64")
    if "sei_version" in df.columns:
        df["sei_version"] = df["sei_version"].astype("Int64")
    if "memory" in df.columns:
        df["memory"] = df["memory"].astype("Float64")
    if "screen_width" in df.columns:
        df["screen_width"] = df["screen_width"].astype("Float64")
    if "screen_height" in df.columns:
        df["screen_height"] = df["screen_height"].astype("Float64")
    if "total_consumption_count" in df.columns:
        df["total_consumption_count"] = df["total_consumption_count"].astype("Int64")
    if "total_consumption_duration" in df.columns:
        df["total_consumption_duration"] = df["total_consumption_duration"].astype("Int64")
    for c in onlyInA:
        df[c] = df[c].astype(pd.StringDtype())

    return df

The parquet is then uploaded to s3 and then crawled by glue. This was legacy code not written by myself but nothing complicated. The only issue I see with my original statement is the schema I was trying which was "permissions": pa.list_(pa.string()),

this should have been

"permissions": pa.string(),

but I played around lots and final have

                "permissions": {
                    "$function": {
                        "body": "function(permissions) { return JSON.stringify(permissions); }",
                        "args": ["$os.permissions"],
                        "lang": "js"
                    }
                },

using a schema of "permissions":pa.string(),

I could not seem to get any data from

analytics_table = collection.aggregate_arrow_all(query, schema=schema, allowDiskUse=True)

without using the js functions