Open TXAggie2000 opened 8 hours ago
A few questions:
Thanks for responding Chris. Yes, in this case ADF is Azure Data Factory. Here is the relevant code you asked for. Pretty vanilla and taken straight from the sample:
@myApp.route(route="orchestrators/unnest_json_async")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
try:
mapping_payload = req.get_json()
except Exception as e:
return func.HttpResponse(e,status_code=400)
instance_id = await client.start_new('unnest_orchestrator', None, mapping_payload)
response = client.create_check_status_response(req, instance_id)
return response
# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def unnest_orchestrator(context):
print(context)
mapping = context.get_input()
result1 = yield context.call_activity("unnest", mapping)
return result1
I am running this in South Central US and here is an instance id: d48accb6e91344f6a16606d642cccec4
Sorry Chris, here is the activity code as well:
# Activity
@myApp.activity_trigger(input_name="mapping")
def unnest(mapping: dict):
logging.info('Python Unnest JSON function processing a request.')
logging.getLogger("azure").setLevel(logging.WARNING)
if 'storage_account' in mapping and \
'container' in mapping and \
'source_directory' in mapping and \
'target_directory' in mapping and \
'nesting_spec' in mapping:
storage_account = mapping['storage_account']
container_name = mapping['container']
source_directory = mapping['source_directory']
target_directory = mapping['target_directory']
nesting_spec = mapping['nesting_spec']
default_credential = DefaultAzureCredential(logging_enable=False)
account_url = f"https://{storage_account}.blob.core.windows.net"
blob_service_client = BlobServiceClient(account_url, credential=default_credential, logging_enable=False)
container_client = blob_service_client.get_container_client(container_name)
blob_list = container_client.list_blobs(name_starts_with=source_directory)
data = []
for blob in blob_list:
blob_name = blob.name
if blob_name.endswith(".json"):
blob_client = container_client.get_blob_client(blob_name)
blob_data = blob_client.download_blob().readall()
content = blob_data.decode('utf-8-sig')
for line_number, line in enumerate(content.splitlines(), start=1):
line = line.strip()
if line:
try:
json_object = json.loads(line)
data.append(json_object)
except json.JSONDecodeError as e:
logging.error(f'JSON Decoding error while processing file {blob_name}')
return {
"status": "Error",
"message": f"JSON Decoding error while processing file {blob_name}: {e.msg}"
}
if len(data) >= 1:
new_spec = {
"collection_reference": "",
"nesting_spec": nesting_spec
}
unnested_result = flatten_json_by_spec(data, new_spec)
else:
logging.error('Not able to unnest JSON based on the specs. Returned empty result.')
return {
"status": "Error",
"message": "Not able to unnest JSON based on the specs. Returned empty result."
}
output = {
"items": unnested_result
}
if target_directory.endswith('/'): target_directory = target_directory.rstrip('/')
blob_name = f"{target_directory}/{uuid.uuid4()}.json"
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
blob_client.upload_blob(json.dumps(output), blob_type="BlockBlob", overwrite=True)
return {
"status": "Success",
"message": f"Successfully unnested array. File created at {container_name}{blob_name}"
}
else:
return {
"status": "Error",
"message": "Missing required values in order to parse and unnest json..."
}
Hmm...yeah, everything looks good from a code perspective. I wonder if something else is afoot. Have you had a look through the Durable Functions troubleshooting guide? https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-troubleshooting-guide. There is some automated analysis that might be able to help root cause the problem. Take a look at the suggestions in the guide and respond back if you're still stuck.
I have a fairly simple process we are using durable functions for. We read in a json file and flatten the inner arrays per a specification and write a new file back to blob storage. Just one activity. What I have noticed when calling this from ADF, is that the file gets written, which is the last line of the task before the return, and the function will continue to return the status of 'Running' for quite a while after. This function does NOT return a large payload, it returns this:
return { "status": "Success", "message": f"Successfully unnested array. File created at {container_name}{blob_name}" }
As an example, here are the timestamps when the function finally returned 'Completed':
The file was written at 2024-11-21T20:26:00Z. My ADF pipeline sleeps for 30 seconds and checks the status again. Why would the function continue for 30 minutes after the actual task was complete?
Thanks, Scott