Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

Patch request not updating as expected and missing some updates #34284

Closed SomanathSankaran closed 1 year ago

SomanathSankaran commented 1 year ago

Describe the bug A clear and concise description of what the bug is Patch request not updating as expected and missing some updates

Exception or Stack Trace Add the exception log and stack trace if available NA

To Reproduce Steps to reproduce the behavior:

we pre-populate data with empty array and we will do a patch using below for-each batch We iterate through the DF to iterate minute array

def write_multi_window_cosmos1min(df, epochID):

final_df = df.withColumn('id', \
                F.lit(F.concat(
                        F.lit(F.col('warehouseid')),
                        F.lit('_'),
                        F.year(F.col('window.end')).cast(T.StringType()),
                        F.lit('_'),
                        F.month(F.col('window.end')).cast(T.StringType()),
                        F.lit('_'),
                        F.dayofmonth(F.col('window.end')).cast(T.StringType()),
                        F.lit('_'),
                        F.hour(F.col('window.end')).cast(T.StringType()),
                        F.lit('_1')
                    ))).withColumn('warehouseid', F.lit(F.col('id')))

print(f'1 min window with batch id {epochID}')
final_df.show(truncate=False)

minute_data = final_df.selectExpr('MINUTE(window.start) as minute_key').collect()

for minute in minute_data:

    curr_minute = minute['minute_key']

    cfg["spark.cosmos.write.patch.columnConfigs"] = f"[col(time_array).path(/time_array/{curr_minute}).op(set)]"

    print('1 min window -> ', cfg["spark.cosmos.write.patch.columnConfigs"])

    final_df\
        .selectExpr('cast(warehouseid as string) as warehouseid',
                    'CAST(id AS STRING) as id',
                    'window.end as curr_tsp',
                    'MINUTE(window.end) as minute_key',
                    'if (actualTemperatureCount is NULL, 0, actualTemperatureCount) as actualShipRate',
                    'if (expectedTemperatureCount is NULL, 0, expectedTemperatureCount) as plannedShipRate'
                )\
        .filter(f'minute_key = {curr_minute}')\
        .groupBy('id', 'warehouseid', 'minute_key').agg(F.expr('struct(MAX(CAST(minute_key AS INT)) AS time_key, MAX(actualShipRate) AS actualShipRate, MAX(plannedShipRate) AS plannedShipRate)').alias('time_array')).drop('minute_key')\
        .write\
        .format('cosmos.oltp')\
        .options(**cfg)\
        .mode('append')\
        .save()

Code Snippet Add the code snippet that causes the issue. Cosmos config

cfg = { "spark.cosmos.accountEndpoint" : cosmosEndpoint, "spark.cosmos.accountKey" : cosmosMasterKey, "spark.cosmos.database" : cosmosDatabaseName, "spark.cosmos.container" : cosmosContainerName, "spark.cosmos.diagnostics":"simple", "checkpointLocation":"/tmp", "spark.cosmos.write.strategy": "ItemPatch", "spark.cosmos.write.bulk.enabled":"true", "spark.cosmos.diagnostics":"simple", "checkpointLocation":"/mnt/nrtpoclanding/multi_window_agg/cosmos-test/checkpointLocation", "spark.cosmos.write.bulk.enabled":"true" }

Expected behavior A clear and concise description of what you expected to happen.

Screenshots

Data to update the index as expected Note we don't see any 429 or 500 issue on comsos side

ghost commented 1 year ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar, @TheovanKraay

kushagraThapar commented 1 year ago

@xinlian12 please take a look when you get chance, thanks!

TheovanKraay commented 1 year ago

@SomanathSankaran the issue is not clearly explained, can you please expand? Please also see sample here on creating custom function to patch raw json using Spark Connector, it may help: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/quickstart-spark?tabs=scala#raw-json-support-for-spark-connector.

github-actions[bot] commented 1 year ago

Hi @SomanathSankaran. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

github-actions[bot] commented 1 year ago

Hi @SomanathSankaran, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

SomanathSankaran commented 11 months ago

hi @TheovanKraay I was able to figure out the issue it is due to spark object which is shared one and it is getting updated when we query if from multiple parallel threads as spark is shared.Meanwhile I will try the json tags