ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.13k stars 590 forks source link

feat: Avoid subqueries inside lambdas #8614

Open NickCrews opened 6 months ago

NickCrews commented 6 months ago

Is your feature request related to a problem?

This currently works, because I am using a Map literal:

import ibis

ibis.options.interactive = True

be = ibis.duckdb.connect()
ibis.set_backend(be)

inp = be.create_table("inp", {"chars": [["a", "b"], ["c"]]})
keys = ["a", "b", "c"]
vals = [1, 2, 3]
mapping = ibis.map(keys, vals)
result = inp.mutate(nums=inp.chars.map(mapping.get))
result
# ┏━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
# ┃ chars         ┃ nums        ┃
# ┡━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
# │ array<string> │ array<int8> │
# ├───────────────┼─────────────┤
# │ ['a', 'b']    │ [1, 2]      │
# │ ['c']         │ [3]         │
# └───────────────┴─────────────┘

But in actuality, I need to generate the mapping based on data in a different table:

map_table = be.create_table("map_table", {"key": keys, "val": vals}, overwrite=True)
mapping = ibis.map(map_table.key.collect(), map_table.val.collect())
result = inp.mutate(nums=inp.chars.map(mapping.get))
# result.cache()
# InvalidInputException: Invalid Input Error: Subqueries are not supported in lambda expressions!
ibis.to_sql(result)
# SELECT
#   "t0"."chars",
#   LIST_APPLY(
#     "t0"."chars",
#     __ibis_param_key__ -> COALESCE(
#       LIST_EXTRACT(
#         ELEMENT_AT(
#           MAP(
#             (
#               SELECT
#                 ARRAY_AGG("t1"."key") AS "ArrayCollect(key)"
#               FROM "map_table" AS "t1"
#             ),
#             (
#               SELECT
#                 ARRAY_AGG("t1"."val") AS "ArrayCollect(val)"
#               FROM "map_table" AS "t1"
#             )
#           ),
#           __ibis_param_key__
#         ),
#         1
#       ),
#       NULL
#     )
#   ) AS "nums"
# FROM "inp" AS "t0"

Describe the solution you'd like

For this to work, I'm not exactly how is best.

This is some duckdb sql that works:

WITH inp as (
  SELECT ['a', 'b'] as chars
  UNION ALL
  SELECT ['c'] as chars
), map_table AS (
  SELECT
    MAP(
      ['a', 'b', 'c', 'd', 'e', 'f'],
      [1, 2, 3, 4, 5, 6]
    ) as map_val
)
SELECT
    LIST_APPLY(inp.chars, x -> map_table.map_val[x][1]) as terms_mapped
FROM
    inp, map_table

It does a cross join, but since map_table is a single row I don't think this is a problem??

What version of ibis are you running?

main

What backend(s) are you using, if any?

No response

Code of Conduct

cpcloud commented 6 months ago

Thanks!

There are a couple problems here, but one is that we can't at the moment avoid generating CTEs in lambdas.

Can you try doing an explicit cross join first in ibis?

NickCrews commented 6 months ago

That does work:

map_table = be.create_table("map_table", {"key": keys, "val": vals}, overwrite=True)
mapping = ibis.map(map_table.key.collect(), map_table.val.collect()).name("mapping")
inp_joined = inp.cross_join(mapping.as_table())
result = inp_joined.mutate(nums=inp_joined.chars.map(inp_joined.mapping.get))
result.cache()

But, I am trying to use this in a tfidf library function that I expose with an API of def tfidf(terms: ir.ArrayValue) -> ir.MapValue: ..., which would require that I can't do any joins on the incoming data: the output has to be able to be "lined up" with the input so that someone could do t2 = t.mutate(idfs=idf(t.description.split(" ")). I can change the API to this library to accept and return a Table, in which case I can do all the joins and stuff I need to internally, but that is just an uglier API: t2 = idf(t, terms=t.description.split(" "), result_col="idfs")

Any tips here?

NickCrews commented 6 months ago

Actually, the API above isn't very composable. I have this function:

def add_array_value_counts(
    t: ir.Table, column: str, *, result_name: str = "{name}_counts"
) -> ir.Table:
    r"""value_counts() for ArrayColumns.

    Parameters
    ----------
    t : Table
        The input table.
    column : str
        The name of the array column to analyze.
    result_name : str, optional
        The name of the resulting column. The default is "{name}_counts".

    Examples
    --------
    >>> import ibis
    >>> from mismo.text import add_array_value_counts
    >>> ibis.options.interactive = True
    >>> terms = [
    ...     None,
    ...     ["st"],
    ...     ["st"],
    ...     ["12", "main", "st"],
    ...     ["99", "main", "ave"],
    ...     ["56", "st", "joseph", "st"],
    ...     ["21", "glacier", "st"],
    ...     ["12", "glacier", "st"],
    ... ]
    t = ibis.memtable({"terms": terms})
    >>> add_array_value_counts(t, "terms")
    ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
    ┃ terms                        ┃ terms_counts                     ┃
    ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
    │ array<string>                │ map<string, int64>               │
    ├──────────────────────────────┼──────────────────────────────────┤
    │ ['st']                       │ {'st': 1}                        │
    │ ['st']                       │ {'st': 1}                        │
    │ ['12', 'main', 'st']         │ {'st': 1, '12': 1, 'main': 1}    │
    │ ['99', 'main', 'ave']        │ {'ave': 1, '99': 1, 'main': 1}   │
    │ ['56', 'st', 'joseph', 'st'] │ {'56': 1, 'joseph': 1, 'st': 2}  │
    │ ['21', 'glacier', 'st']      │ {'glacier': 1, 'st': 1, '21': 1} │
    │ ['12', 'glacier', 'st']      │ {'glacier': 1, 'st': 1, '12': 1} │
    │ NULL                         │ NULL                             │
    └──────────────────────────────┴──────────────────────────────────┘
    """  # noqa: E501
    terms = t[column]
    # Add id before unnesting so we always get an id per row
    t = t.mutate(__id=ibis.row_number())
    normalized = t.mutate(
        __term=terms.filter(lambda t: t.notnull()).unnest(),
    )
    counts = normalized.group_by(["__id", "__term"]).agg(
        __n=_.count(),
    )
    by_terms = counts.group_by("__id").agg(
        __result=ibis.map(keys=_.__term.collect(), values=_.__n.collect()),
    )
    result = t.left_join(by_terms, "__id").drop("__id", "__id_right")

    # annoying logic to deal with the edge case of an empty array
    term_type = terms.type().value_type
    counts_type = dt.Map(key_type=term_type, value_type=dt.int64)
    empty = ibis.literal({}, counts_type)
    result = result.mutate(__result=(_[column].length() == 0).ifelse(empty, _.__result))
    return result.rename({result_name.format(name=column): "__result"})

This works fine if I have a single array<something> I want to operate on. But as soon as that ArrayColumn is actually nested inside another complex dtype, such as array<array<string>> (I get this because a person has multiple addresses, so someones address tokens might be [["12", "main", "st"], ["45", "oak", "ave"]]), it becomes very hard to apply this function to each sub-element.

So I would very much like to make it so that the explicit cross join is not needed.