Closed JulienPeloton closed 1 month ago
git diff assets/spark_ztf_transfer.py
diff --git a/assets/spark_ztf_transfer.py b/assets/spark_ztf_transfer.py
index 114590f..e827213 100644
--- a/assets/spark_ztf_transfer.py
+++ b/assets/spark_ztf_transfer.py
@@ -424,7 +424,15 @@ def main(args):
if args.content == 'Full packet':
# Cast fields to ease the distribution
cnames = df.columns
cnames[cnames.index('timestamp')] = 'cast(timestamp as string) as timestamp'
+ cnames[cnames.index("brokerEndProcessTimestamp")] = (
+ "cast(brokerEndProcessTimestamp as string) as brokerEndProcessTimestamp"
+ )
+ cnames[cnames.index("brokerStartProcessTimestamp")] = (
+ "cast(brokerStartProcessTimestamp as string) as brokerStartProcessTimestamp"
+ )
+ cnames[cnames.index("brokerIngestTimestamp")] = (
+ "cast(brokerIngestTimestamp as string) as brokerIngestTimestamp"
+ )
cnames[cnames.index('cutoutScience')] = 'struct(cutoutScience.*) as cutoutScience'
cnames[cnames.index('cutoutTemplate')] = 'struct(cutoutTemplate.*) as cutoutTemplate'
cnames[cnames.index('cutoutDifference')] = 'struct(cutoutDifference.*) as cutoutDifference'
Done in #642
Before sending data to Kafka, we format some columns of the dataframe:
https://github.com/astrolabsoftware/fink-science-portal/blob/4abe52850bf9e5e7e7670973ab9079fc4bfeff38/assets/spark_ztf_transfer.py#L424-L434
but this is not up-to-date with the current schema -- namely
timestamp
does not exist anymore (see bin/distribute.py). In general, one should refactor this code...