airflow-plugins / mongo_plugin

Apache License 2.0
23 stars 20 forks source link

PyMongo Query Doesn't Accept String Date Range #1

Closed berniechiu closed 6 years ago

berniechiu commented 6 years ago

Hi,

When I was working with a query like this

      'mongo_query': [{ "$match": { "updated_at": {
                        "$lt": '{{ next_execution_date }}',
                        "$gte": '{{ execution_date }}'
                      } } },
                      { "$project": process_collection('orders')[0]}],

Those dates are parsed by Jinja and end up in string date, but PyMongo doesn't seem to recognize string dates parameters in aggregation. I manually transform to date object seems to work fine. I guess need to do some workaround here maybe? Since we're directly sending result template from Jinja here @@ ?

collection = mongo_conn.get_database(self.mongo_db).get_collection(self.mongo_collection)
results = collection.aggregate(self.mongo_query) if self.is_pipeline else collection.find(self.mongo_query)
andscoop commented 6 years ago

@berniechiu What format are your datetimes in mongo instance? Mongo allows comparisons on ISO format so I have had success with something like the following

LOWER_BOUND = "{{ prev_execution_date.isoformat() + 'Z' if prev_execution_date != None else ts + 'Z' }}"
UPPER_BOUND = "{{ ts + 'Z' }}"

Which should match the mongo date formats described here.

berniechiu commented 6 years ago

Thanks for the help @andscoop 💯

Actually I follow the example here directly so far https://github.com/airflow-plugins/Example-Airflow-DAGs/blob/master/etl/mongo_to_redshift/mongo_to_redshift.py

I also added Z at the end, but it doesn't seem to query properly as String type either

Those are my queries example below:

# Doesn't work (The original query being parsed by Jinja with runtime variables)
self.mongo_query = [{'$match': {'updated_at': {'$lt': '2018-05-15T00:00:00Z', '$gte': '2018-04-14T00:00:00Z'}}}, {'$project': {'_id': '$_id', 'order_number': '$order_number'}}]
results = collection.aggregate(self.mongo_query)
self.transform(results) #=> returns [] nothting

# Then I assign with date object manually, then works
self.mongo_query = [{'$match': {'updated_at': {'$lt': datetime.datetime(2018, 5, 15, 0, 0, tzinfo=tzutc()), '$gte': datetime.datetime(2018, 4, 14, 0, 0, tzinfo=tzutc())}}}, {'$project': {'_id': '$_id', 'order_number': '$order_number'}}]
results = collection.aggregate(self.mongo_query)
self.transform(results) #=> returns [xxx, xxx, xxx]
benjamingregory commented 6 years ago

Hey @berniechiu -- I actually have run into a similar problem. I didn't add it to the OSS version of this plugin because it felt pretty hacky and just solved my use case (not a generalized solution). I'm sure there is a better solution so please feel free to PR if you think of it.

The problem is that PyMongo (the library this operator uses to connect to Mongo) doesn't accept strings as a date parameter so it has to be converted after the fact.

    def execute(self, context):
        """
        Executed by task_instance at runtime
        """
        # self.mongo_query = [{"$match": {"dateLastActivity": {
        #                    "$lt": datetime.datetime(2015, 11, 1),
        #                    "$gte": datetime.datetime(2015, 10, 1)
        #                    }}}]

        # Hacky work around to querying a timestamp.
        # The templatable field gets converted to a timestamp which in turn
        # needs ot be converted back to a datetime object before passing
        # to pymongo. This can't occur prior to templating as airflow does
        # not support datetime objects.
        logging.info(self.mongo_query)

        if self.mongo_query is not None:
            for stage in self.mongo_query:
                for query, params in stage.items():
                    if query == '$match':
                        for k, v in params.items():
                            if k == 'dateLastActivity':
                                v['$lt'] = datetime.strptime(v['$lt'], '%Y-%m-%d %H:%M:%S')
                                v['$gte'] = datetime.strptime(v['$gte'], '%Y-%m-%d %H:%M:%S')
                                print(v['$lt'])
                                print(v['$gte'])
        mongo_conn = MongoHook(self.mongo_conn_id).get_conn()
        s3_conn = S3Hook(self.s3_conn_id)