Open bbuechler opened 1 year ago
A little bit of follow up.
I've been working with this a bunch. I was getting the 'NoneType' object is not subscriptable
error because the Dynamic Frame wasn't getting any data. This was because by default, Dynamic Frames some how only look at the top-level and don't recurse into sub-prefix/directories.
So, even though the Athena table was built around s3://bucket/path/
, it wasn't picking up objects at s3://bucket/path/sub-path/log_file.ext
. When that happened, data_frame
had no payload, and partition write out was skipped. Since this is the initial run, nothing had been ever written to the output bucket.
The next step in the s3_access_job.py
workflow call to job.convert_and_partition()
is add_new_optimized_partitions()
. This calls self.optimized_catalog.get_and_create_partitions()
followed by self.partitioner.build_partitions_from_s3()
and finally the call to s3_reader.get_first_hivecompatible_date_in_prefix()
. That is making the failing call in my original post since it can't find any partitions AT ALL, as the initial population was aborted.
Based on this Re:Post Question, I added the additional_options = {"recurse": True}
value to my Dynamic Frame and it JustWorked™. I updated my workflow to add --recursive
parameter that I pass down into the DataConverter
class and parse into True/False since some times I don't want recursion.
I also changed DataConverter.run()
and job.trigger_conversion()
to return True
/None
based on and then skip add_new_optimized_partitions()
if we didn't add any new data.
Ultimately though, I may be totally SOL on this project. When the data gets optimized into parquet files, there is no backwards traceability to the original source log document. I tried a bunch of methods to include the raw Athena tables "$PATH"
into the converted/optimized Athena and I couldn't find a way. I tried using spark.sql()
to re-write the dynamic frame to include $PATH
, and I tried creating an Athena view, but DynamicFrames can't read views. I spent days trying to coax a solution, but much like this StackOverflow Post, I came up empty.
Hi @bbuechler - Thanks for opening this issue and providing so much information. Apologies I haven't been able to respond until now.
For S3 Access Logs, the tool does assume that they're in their original location as written to by the S3 service. In other words, not organized under a sub-folder or partition. I'm glad you were able to get it to work, but yes there's no sort of lineage tracking. I presume you could potentially add input_file_name
to the Glue code to extract the filename (the Athena "$PATH" wouldn't work since the conversion code is Glue).
Unfortunately I haven't been able to maintain this project since publishing - it'd be nice to update for newer versions of Glue or make the library more generic so it could run anywhere, or even just make it Athena CTAS/INSERT INTO. But alas, not able to at the moment.
@dacort No worries, I understand the loss of traction. This was an awesome project to help me to learn a bunch of the mechanics of Glue and PySpark, something I previously had no experience with. Hopefully the {"recurse": True}
might help someone else get their workflow moving.
I had considered input_file_name()
, but I interpreted "file name of the current Spark task" as the python executable name. 🤦 I moved right passed it instead of trying. I just tried it. It gave me exactly what I needed. 🫠
Any way, I appreciate your work, and hopefully my input above can help someone else in the future, even if this project does not continue to evolve.
FWIW, I've been running my ingests as Glue 3 to take advantage of autoscaling. Seems to be working without a hitch. Glue 4 gave me a deprecation warning, but seemed to execute just fine.
Awesome, glad to hear you got it working! S3 Access Logs, given their legacy, can be quite persnickety. :)
Super glad to hear it's been helpful to you and thanks for the kudos. 🙏
One final note for anyone following in my footsteps. The input_file_name()
function does not work if file grouping is used for processing efficiency. File grouping is automatically enabled when processing +50K objects. This was a frustrating and confusing feature that was discovered during larger-scale log ingest testing. With File Grouping enabled, input_file_name()
returns an empty value. If traceability is more important than throughput and cost optimization, it can be disabled in the additional_options
block. This block is from the run()
function of the DataConverter
class.
# Retrieve the source data from the Glue catalog
source_data = self.glue_context.create_dynamic_frame.from_catalog(
database=self.data_catalog.get_database_name(),
table_name=self.data_catalog.get_table_name(),
transformation_ctx="source_data",
additional_options = {
"recurse": True, # RECURSIVE!
"groupFiles": 'none' # Performance Hit to ensure input_file_name works
}
)
to inject the log file name, I just add it after the Dynamic Frame is converted to a Data Frame, also within converter.py
.
from pyspark.sql.functions import input_file_name, lit
....
data_frame = data_frame.withColumn("log_object", lit(input_file_name()))
I ran into an issue where we aggregate S3 distribution logs from a variety of sources into one account, and the logs are broken down into sub-prefix:
I was trying to run a single Glue job for
s3://<log-bucket>/s3_distribution_logs/
to populate all<deployment-name-sub-prefix>
logs into the sameCONVERTED_TABLE_NAME
. In this case, theRAW_TABLE_NAME
athena table was getting populated, the job would initially error withe the below error, then on subsequent runs would run "successfully". Unfortunately, I wouldn't get any logs into myCONVERTED_TABLE_NAME
Athena table.With continuous logging enabled, and a little tinkering, I tracked the issue down to
_get_first_key_in_prefix()
:The values going into
self.s3_client.list_objects_v2(**query_params)
were:from the
glue_jobs.json
:Its entirely unclear to my why, since I'm VERY new to both this project and Glue in general, but if I supply
instead of:
...it just work. Albeit with a smaller subset of data than I wanted. This may also be related to #30