voxel51 / fiftyone

The open-source tool for building high-quality datasets and computer vision models
https://fiftyone.ai
Apache License 2.0
8.13k stars 541 forks source link

[BUG] Operation failure when trying to use dataset.match() to filter by an element in a list contained within a sample field #4571

Closed darrenjkt closed 1 month ago

darrenjkt commented 2 months ago

Describe the problem

I have a sample field, "condition", which is a list that contains strings such as "rain", "sunny" etc. When I try to filter samples by this sample field, I get an error when viewing the dataset

Code to reproduce issue

view = dataset.match(F("condition").contains("rain")) # contains_str also throws the same error.
view

The following also throws the exact same error:

view = dataset.filter_field("condition", F().contains("rain"))
view

System information

Other info/logs

Error log:

---------------------------------------------------------------------------
OperationFailure                          Traceback (most recent call last)
File /usr/local/lib/python3.8/dist-packages/IPython/core/formatters.py:708, in PlainTextFormatter.__call__(self, obj)
    701 stream = StringIO()
    702 printer = pretty.RepresentationPrinter(stream, self.verbose,
    703     self.max_width, self.newline,
    704     max_seq_length=self.max_seq_length,
    705     singleton_pprinters=self.singleton_printers,
    706     type_pprinters=self.type_printers,
    707     deferred_pprinters=self.deferred_printers)
--> 708 printer.pretty(obj)
    709 printer.flush()
    710 return stream.getvalue()

File /usr/local/lib/python3.8/dist-packages/IPython/lib/pretty.py:410, in RepresentationPrinter.pretty(self, obj)
    407                         return meth(obj, self, cycle)
    408                 if cls is not object \
    409                         and callable(cls.__dict__.get('__repr__')):
--> 410                     return _repr_pprint(obj, self, cycle)
    412     return _default_pprint(obj, self, cycle)
    413 finally:

File /usr/local/lib/python3.8/dist-packages/IPython/lib/pretty.py:778, in _repr_pprint(obj, p, cycle)
    776 """A pprint that just redirects to the normal repr function."""
    777 # Find newlines and replace them with p.break_()
--> 778 output = repr(obj)
    779 lines = output.splitlines()
    780 with p.group():

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/collections.py:217, in SampleCollection.__repr__(self)
    216 def __repr__(self):
--> 217     return self.summary()

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/view.py:384, in DatasetView.summary(self)
    375 def summary(self):
    376     """Returns a string summary of the view.
    377 
    378     Returns:
    379         a string summary
    380     """
    381     elements = [
    382         ("Dataset:", self.dataset_name),
    383         ("Media type:", self.media_type),
--> 384         ("Num %s:" % self._elements_str, self.count()),
    385     ]
    387     if self.media_type == fom.GROUP:
    388         elements.insert(2, ("Group slice:", self.group_slice))

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/collections.py:7415, in SampleCollection.count(self, field_or_expr, expr, safe)
   7320 """Counts the number of field values in the collection.
   7321 
   7322 ``None``-valued fields are ignored.
   (...)
   7410     the count
   7411 """
   7412 make = lambda field_or_expr: foa.Count(
   7413     field_or_expr, expr=expr, safe=safe
   7414 )
-> 7415 return self._make_and_aggregate(make, field_or_expr)

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/collections.py:9844, in SampleCollection._make_and_aggregate(self, make, args)
   9841 if isinstance(args, (list, tuple)):
   9842     return tuple(self.aggregate([make(arg) for arg in args]))
-> 9844 return self.aggregate(make(args))

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/collections.py:9537, in SampleCollection.aggregate(self, aggregations)
   9534     pipelines.append(pipeline)
   9536 # Run all aggregations
-> 9537 _results = foo.aggregate(self._dataset._sample_collection, pipelines)
   9539 # Parse batch results
   9540 if batch_aggs:

File /usr/local/lib/python3.8/dist-packages/fiftyone/core/odm/database.py:347, in aggregate(collection, pipelines)
    344     return _do_async_pooled_aggregate(collection, pipelines)
    346 if num_pipelines == 1:
--> 347     result = collection.aggregate(pipelines[0], allowDiskUse=True)
    348     return [result] if is_list else result
    350 return _do_pooled_aggregate(collection, pipelines)

File /usr/local/lib/python3.8/dist-packages/pymongo/collection.py:2696, in Collection.aggregate(self, pipeline, session, let, comment, **kwargs)
   2620 """Perform an aggregation using the aggregation framework on this
   2621 collection.
   2622 
   (...)
   2693     https://mongodb.com/docs/manual/reference/command/aggregate
   2694 """
   2695 with self.__database.client._tmp_session(session, close=False) as s:
-> 2696     return self._aggregate(
   2697         _CollectionAggregationCommand,
   2698         pipeline,
   2699         CommandCursor,
   2700         session=s,
   2701         explicit_session=session is not None,
   2702         let=let,
   2703         comment=comment,
   2704         **kwargs,
   2705     )

File /usr/local/lib/python3.8/dist-packages/pymongo/_csot.py:108, in apply.<locals>.csot_wrapper(self, *args, **kwargs)
    106         with _TimeoutContext(timeout):
    107             return func(self, *args, **kwargs)
--> 108 return func(self, *args, **kwargs)

File /usr/local/lib/python3.8/dist-packages/pymongo/collection.py:2604, in Collection._aggregate(self, aggregation_command, pipeline, cursor_class, session, explicit_session, let, comment, **kwargs)
   2593     kwargs["comment"] = comment
   2594 cmd = aggregation_command(
   2595     self,
   2596     cursor_class,
   (...)
   2601     user_fields={"cursor": {"firstBatch": 1}},
   2602 )
-> 2604 return self.__database.client._retryable_read(
   2605     cmd.get_cursor,
   2606     cmd.get_read_preference(session),  # type: ignore[arg-type]
   2607     session,
   2608     retryable=not cmd._performs_write,
   2609     operation=_Op.AGGREGATE,
   2610 )

File /usr/local/lib/python3.8/dist-packages/pymongo/mongo_client.py:1540, in MongoClient._retryable_read(self, func, read_pref, session, operation, address, retryable, operation_id)
   1535 # Ensure that the client supports retrying on reads and there is no session in
   1536 # transaction, otherwise, we will not support retry behavior for this call.
   1537 retryable = bool(
   1538     retryable and self.options.retry_reads and not (session and session.in_transaction)
   1539 )
-> 1540 return self._retry_internal(
   1541     func,
   1542     session,
   1543     None,
   1544     operation,
   1545     is_read=True,
   1546     address=address,
   1547     read_pref=read_pref,
   1548     retryable=retryable,
   1549     operation_id=operation_id,
   1550 )

File /usr/local/lib/python3.8/dist-packages/pymongo/_csot.py:108, in apply.<locals>.csot_wrapper(self, *args, **kwargs)
    106         with _TimeoutContext(timeout):
    107             return func(self, *args, **kwargs)
--> 108 return func(self, *args, **kwargs)

File /usr/local/lib/python3.8/dist-packages/pymongo/mongo_client.py:1496, in MongoClient._retry_internal(self, func, session, bulk, operation, is_read, address, read_pref, retryable, operation_id)
   1470 @_csot.apply
   1471 def _retry_internal(
   1472     self,
   (...)
   1481     operation_id: Optional[int] = None,
   1482 ) -> T:
   1483     """Internal retryable helper for all client transactions.
   1484 
   1485     :param func: Callback function we want to retry
   (...)
   1494     :return: Output of the calling func()
   1495     """
-> 1496     return _ClientConnectionRetryable(
   1497         mongo_client=self,
   1498         func=func,
   1499         bulk=bulk,
   1500         operation=operation,
   1501         is_read=is_read,
   1502         session=session,
   1503         read_pref=read_pref,
   1504         address=address,
   1505         retryable=retryable,
   1506         operation_id=operation_id,
   1507     ).run()

File /usr/local/lib/python3.8/dist-packages/pymongo/mongo_client.py:2353, in _ClientConnectionRetryable.run(self)
   2351 self._check_last_error(check_csot=True)
   2352 try:
-> 2353     return self._read() if self._is_read else self._write()
   2354 except ServerSelectionTimeoutError:
   2355     # The application may think the write was never attempted
   2356     # if we raise ServerSelectionTimeoutError on the retry
   2357     # attempt. Raise the original exception instead.
   2358     self._check_last_error()

File /usr/local/lib/python3.8/dist-packages/pymongo/mongo_client.py:2491, in _ClientConnectionRetryable._read(self)
   2489 if self._retrying and not self._retryable:
   2490     self._check_last_error()
-> 2491 return self._func(self._session, self._server, conn, read_pref)

File /usr/local/lib/python3.8/dist-packages/pymongo/aggregation.py:162, in _AggregationCommand.get_cursor(self, session, server, conn, read_preference)
    159     write_concern = None
    161 # Run command.
--> 162 result = conn.command(
    163     self._database.name,
    164     cmd,
    165     read_preference,
    166     self._target.codec_options,
    167     parse_write_concern_error=True,
    168     read_concern=read_concern,
    169     write_concern=write_concern,
    170     collation=self._collation,
    171     session=session,
    172     client=self._database.client,
    173     user_fields=self._user_fields,
    174 )
    176 if self._result_processor:
    177     self._result_processor(result, conn)

File /usr/local/lib/python3.8/dist-packages/pymongo/helpers.py:342, in _handle_reauth.<locals>.inner(*args, **kwargs)
    339 from pymongo.pool import Connection
    341 try:
--> 342     return func(*args, **kwargs)
    343 except OperationFailure as exc:
    344     if no_reauth:

File /usr/local/lib/python3.8/dist-packages/pymongo/pool.py:989, in Connection.command(self, dbname, spec, read_preference, codec_options, check, allowable_errors, read_concern, write_concern, parse_write_concern_error, collation, session, client, retryable_write, publish_events, user_fields, exhaust_allowed)
    987     self._raise_if_not_writable(unacknowledged)
    988 try:
--> 989     return command(
    990         self,
    991         dbname,
    992         spec,
    993         self.is_mongos,
    994         read_preference,
    995         codec_options,
    996         session,
    997         client,
    998         check,
    999         allowable_errors,
   1000         self.address,
   1001         listeners,
   1002         self.max_bson_size,
   1003         read_concern,
   1004         parse_write_concern_error=parse_write_concern_error,
   1005         collation=collation,
   1006         compression_ctx=self.compression_context,
   1007         use_op_msg=self.op_msg_enabled,
   1008         unacknowledged=unacknowledged,
   1009         user_fields=user_fields,
   1010         exhaust_allowed=exhaust_allowed,
   1011         write_concern=write_concern,
   1012     )
   1013 except (OperationFailure, NotPrimaryError):
   1014     raise

File /usr/local/lib/python3.8/dist-packages/pymongo/network.py:212, in command(conn, dbname, spec, is_mongos, read_preference, codec_options, session, client, check, allowable_errors, address, listeners, max_bson_size, read_concern, parse_write_concern_error, collation, compression_ctx, use_op_msg, unacknowledged, user_fields, exhaust_allowed, write_concern)
    210             client._process_response(response_doc, session)
    211         if check:
--> 212             helpers._check_command_response(
    213                 response_doc,
    214                 conn.max_wire_version,
    215                 allowable_errors,
    216                 parse_write_concern_error=parse_write_concern_error,
    217             )
    218 except Exception as exc:
    219     duration = datetime.datetime.now() - start

File /usr/local/lib/python3.8/dist-packages/pymongo/helpers.py:248, in _check_command_response(response, max_wire_version, allowable_errors, parse_write_concern_error)
    245 elif code == 43:
    246     raise CursorNotFound(errmsg, code, response, max_wire_version)
--> 248 raise OperationFailure(errmsg, code, response, max_wire_version)

OperationFailure: PlanExecutor error during aggregation :: caused by :: $in requires an array as a second argument, found: missing, full error: {'ok': 0.0, 'errmsg': 'PlanExecutor error during aggregation :: caused by :: $in requires an array as a second argument, found: missing', 'code': 40081, 'codeName': 'Location40081'}

Willingness to contribute

The FiftyOne Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the FiftyOne codebase?

brimoor commented 1 month ago

@darrenjkt apologies for the delay 😅

The solution here is to use:

view = dataset.exists("condition").match(F("condition").contains("rain"))

Unfortunately contains() does not gracefully handle missing/None-valued fields, so we add exists() first to filter out these samples.