Open golfalot opened 1 week ago
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar @pjohari-ms @TheovanKraay.
@golfalot can you please share the config in your Scala/Python notebook here (preferably the full code if feasible), and the throughput in your container? We'll address the spark connector versioning issue separately (I doubt it has a bearing on your issue with throughput control).
Thank you @TheovanKraay
We're seeking a solution where we can load fast but without saturating RUs to avoid the clients apps getting any 429 during the bulk load/refresh.
We are writing 1m Structs (scaled down for test) to a single partition (I know... but needs must as data api builder doesn't currently support hierarchical partitions).
The Cosmos DB is set to autoscale max 31k RUs shared. Single container means we're capping out at a theoretical 10k max RUs. We understand that limitation.
# Query cores per executor
cores_per_executor = sc.getConf().get("spark.executor.cores")
print(f"Cores per executor: {cores_per_executor}")
num_executors = int(spark.conf.get("spark.executor.instances"))
print(f"Number of executors: {num_executors}")
# Calculate total number of partitions
total_cores = int(num_executors) * int(cores_per_executor)
path = "/gold/price_paid/factors_with_category_type_with_history_cosmos"
# read in data
df_1m_sample = (
spark.read.format('delta')
.load(path).drop('version')
.withColumn('version', F.lit('data_202408_abp_107_build_v1'))
.filter(F.col('price_paid_version') == '202408')
.withColumnRenamed('price_paid_version', 'data_version')
.withColumn('id', F.col('uprn').cast('string'))
.orderBy(F.col('uprn'))
.limit(1000000)
.repartition(total_cores)
)
# force into memory for testing only
df_to_write = spark.createDataFrame(df_1m_sample.collect(), schema=df_1m_sample.schema)
(
df_to_write
.write
.format("cosmos.oltp")
.option("spark.synapse.linkedService", "CosmosDB_GraphQL_rating_datasets_by_source_01_to_25")
.option("spark.cosmos.container", 'pricePaidFixedRUv2')
.mode("APPEND")
.save()
)
(
df_to_write
.write
.mode("APPEND") # do not delete container before writing
.format("cosmos.oltp")
.option("spark.synapse.linkedService", "CosmosDB_GraphQL_rating_datasets_by_source_01_to_25")
.option("spark.cosmos.container", 'pricePaidFixedRUv2')
.option('spark.cosmos.throughputControl.enabled','true')
.option("spark.cosmos.write.bulk.initialBatchSize", 1)
.option('spark.cosmos.throughputControl.name','pricePaid_Loader_Jobs_Group')
.option('spark.cosmos.throughputControl.globalControl.database','<redacted>')
.option('spark.cosmos.throughputControl.globalControl.container','ThroughputControl')
.option('spark.cosmos.throughputControl.accountKey',TokenLibrary.getSecretWithLS("<redacted>","read-write-cosmos-elastic-reference-data-uks")) # need to use a proper secret here, but variable ok for
.option('spark.cosmos.throughputControl.targetThroughput', 9500)
.save()
)
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0b06f411-0000-1100-0000-673601b60000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.754977Z",
"loadFactor": 1.1566333920010767,
"allocatedThroughput": 4634.486770433088,
"_rid": "fawUALp-HLmfAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731592630
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0b06d211-0000-1100-0000-673601b60000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.649294Z",
"loadFactor": 1.2142908911603594,
"allocatedThroughput": 4998.574744514014,
"_rid": "fawUALp-HLmeAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731592630
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3R0LTAuMw.info",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.config",
"targetThroughput": "",
"targetThroughputThreshold": "0.3",
"isDefault": false,
"_rid": "fawUALp-HLmKAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmKAAAAAAAAAA==/",
"_etag": "\"b1058a1b-0000-1100-0000-6734dacf0000\"",
"_attachments": "attachments/",
"_ts": 1731517135
}
doing some "dumb" and unintuitive .option('spark.cosmos.throughputControl.targetThroughput', 28000)
gives us 6.12k RUs sustained with a few hundred 429 errors, not tens of thousands like we get without throughput control.
here are the clients
[
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0e06ddc3-0000-1100-0000-673606cd0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.649294Z",
"loadFactor": 0.1,
"allocatedThroughput": 357.59729776277,
"_rid": "fawUALp-HLmeAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731593933
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0e06d9c3-0000-1100-0000-673606cd0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.754977Z",
"loadFactor": 0.1,
"allocatedThroughput": 357.59729776277,
"_rid": "fawUALp-HLmfAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731593933
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA0fed1b45-bfd5-4522-b8b4-36fd48f8ba7e",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0e0619c5-0000-1100-0000-673606cf0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:02:53.035802Z",
"loadFactor": 1.146207023576095,
"allocatedThroughput": 12285.190558949238,
"_rid": "fawUALp-HLmiAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmiAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731593935
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA03deee1b-89b2-4a28-95a1-9ec2de0eb988",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"0e0606c5-0000-1100-0000-673606cf0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:02:53.175663Z",
"loadFactor": 1.2661901147614698,
"allocatedThroughput": 13029.794957771293,
"_rid": "fawUALp-HLmjAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmjAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731593935
}
]
@golfalot thank you for sharing the information. First, can you try switching to dedicated container throughput instead of shared db throughput? Shared throughput does have some edge case issues. There should also be less need for this since min throughput can now be set to 1000RUs max for autoscale (min 100 RU per container). Just want to rule out issues with dedicated RU/confirm whether you are seeing the same results for dedicated RU before recommending/investigating anything further.
following on with the unintuitive, .option('spark.cosmos.throughputControl.targetThroughput', 24000)
(8% decrease) achieves an avg 5.5k RUs scale, no 429 errors
[
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA0575a16f-4e17-4c5a-8d4e-e7b99e46077d",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"1606ecd0-0000-1100-0000-673612be0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.649294Z",
"loadFactor": 0.1,
"allocatedThroughput": 353.19836864830756,
"_rid": "fawUALp-HLmeAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmeAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596990
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtOTUwMA91ad6c1e-92a9-4db9-92ec-b940a7ee6f9a",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"1606b6d0-0000-1100-0000-673612be0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T13:29:25.754977Z",
"loadFactor": 0.1,
"allocatedThroughput": 333.71436372805505,
"_rid": "fawUALp-HLmfAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmfAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596990
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA0fed1b45-bfd5-4522-b8b4-36fd48f8ba7e",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"160684d1-0000-1100-0000-673612bf0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:02:53.035802Z",
"loadFactor": 0.1,
"allocatedThroughput": 1041.0057181213276,
"_rid": "fawUALp-HLmiAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmiAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596991
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjgwMDA03deee1b-89b2-4a28-95a1-9ec2de0eb988",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"16068ed1-0000-1100-0000-673612bf0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:02:53.175663Z",
"loadFactor": 0.1,
"allocatedThroughput": 1041.0057181213276,
"_rid": "fawUALp-HLmjAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmjAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596991
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjQwMDAfa7bd8d1-28c7-4dc5-868f-63e9f18aaf12",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"1606ded0-0000-1100-0000-673612be0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:43:02.051361Z",
"loadFactor": 1.2365507777945268,
"allocatedThroughput": 11033.626546555519,
"_rid": "fawUALp-HLmmAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmmAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596990
},
{
"id": "cmF0aW5nLWRhdGFzZXRzLWJ5LXNvdXJjZS0wMS0yNS9wcmljZVBhaWRGaXhlZFJVdjIvcHJpY2VQYWlkX0xvYWRlcl9Kb2JzX0dyb3VwL3QtMjQwMDA53ea2b2e-48b6-4b0a-93ed-98724acf433d",
"groupId": "rating-datasets-by-source-01-25/pricePaidFixedRUv2/pricePaid_Loader_Jobs_Group.client",
"_etag": "\"1606ced0-0000-1100-0000-673612be0000\"",
"ttl": 11,
"initializeTime": "2024-11-14T14:43:02.116834Z",
"loadFactor": 1.0531558695936143,
"allocatedThroughput": 9274.669043653816,
"_rid": "fawUALp-HLmnAAAAAAAAAA==",
"_self": "dbs/fawUAA==/colls/fawUALp-HLk=/docs/fawUALp-HLmnAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1731596990
}
]
Hi @TheovanKraay good idea, here are the latest findings
on db scoped provisioned autoscale, we increased from 31k to 40k, but still only achieved 7.5k on the container with .option('spark.cosmos.throughputControl.targetThroughput', 40000)
on per container autoscale 10k max, we can indeed now reach 10k autoscale. So that's a win! Only downside is there was a brief period where it overshot and cause 1.37k 429s during a one minute window before throttling back a little. .option('spark.cosmos.throughputControl.targetThroughput', 9500)
(9500)
We're going to experiment with the targetThroughput value to try to prevent overshoot.... a bit later:
We've also noticed another interesting feature. When runnig all these tests in interactive notebook session in Synpase, the client documents/items in the throughput control container from previous runs continue to exist. Not until the spark session is stopped does the ttl get honoured and they vanish.
Still getting a fistful of 429s with .option('spark.cosmos.throughputControl.targetThroughput', 9000)
(9000) where it bumps into the 10k limit for a couple minutes, but it RU usage does settle thereafter.
@golfalot per the docs - throughput control doesn't do RU pre-calculation of each operation, as it is implemented client-side. Instead, it tracks the RU usages after the operation based on the response header. So its an approximation, and doesn't guarantee that amount of throughput is available for the group at any given time. The kind of throttling anomaly you described is more-or-less expected - to eliminate it, you would probably need to set a lower limit. The docs in the throughput control container are metadata only, and should be getting updated (and ttl purged) on each new run.
@TheovanKraay understood about the overshoot, understand the reasons.
ref: The docs in the throughput control container are metadata only, and should be getting updated (and ttl purged) on each new run.
Can confirm definitely not the case with an interactive session and multiple cosmos runs on the same spark session.
Thanks for your prompt responses today, we are in a much better place thanks to you!
I wish use use the latest version of spark cosmos connector, but it appears the default spark pool configuration in Azure Synapse already has a package that provides
.format("cosmos.oltp")
by default.As far as I can establish that package is
azure-cosmos-analytics-spark_3-4_2-12_synapse-2.1.1.jar
I want to use latest and greatest
azure-cosmos-spark_3-4_2-12-4.34.0.jar
I am unable to configure spark session with the default package removed.
spark.jars.excludes
won't remove it, probably because it's a default.With the newer package added, how can I be certain that it is the one being used when I run the cosmos write job ?
Background:
We're using ThroughputControl with targetThroughput but only achieving about 25% of the target, though we can get much higher throughput on that container without ThroughputControl.
Using the very latest spark connector feels like the right thing to pursue given how far out of date the default one is.