machow / siuba

Python library for using dplyr like syntax with pandas and SQL
https://siuba.org
MIT License
1.14k stars 48 forks source link

Filtering using the .isin() method for bigquery #407

Closed machow closed 2 years ago

machow commented 2 years ago

From @lauriemerrell, moving to a github issue. Can you let me know if this does what you're looking for? :)

Querying using the .isin() method

from siuba.sql.utils import mock_sqlalchemy_engine
from siuba.sql import LazyTbl
from siuba import *
engine = mock_sqlalchemy_engine("bigquery")
test_tbl = LazyTbl(engine, "test_table", ["x", "y", "z", "calitp_itp_id"])

# note that we need to assign this to a variable, or it will error
# but we mostly care about the SQL it prints out
q = test_tbl >> mutate(res = _.x.isin([1,2])) >> show_query()
SELECT test_table.x, test_table.y, test_table.z, test_table.x IN (1, 2) AS res 
FROM test_table

Implementing a new verb

See

Here's an example, building off the code above

# note that this function is named singledispatch2 inside siuba, but is being
# renamed to verb_dispatch..
from siuba.siu.dispatchers import verb_dispatch
from siuba.sql import LazyTbl

from siuba import filter
from siuba import _

@verb_dispatch(LazyTbl)
def filter_calitp_id(__data, ids):
    return filter(__data, _["calitp_itp_id"].isin(ids))

q = test_tbl >> filter_calitp_id([1, 2]) >> show_query()
SELECT anon_1.x, anon_1.y, anon_1.z, anon_1.calitp_itp_id 
FROM (SELECT test_table.x AS x, test_table.y AS y, test_table.z AS z, test_table.calitp_itp_id AS calitp_itp_id 
FROM test_table) AS anon_1 
WHERE anon_1.calitp_itp_id IN (1, 2)
lauriemerrell commented 2 years ago

Thanks @machow -- this does not seem to work for me. I can run the example with mock_sqlalchemy_engine as written but I can't run it from BigQuery.

I think that in mock_sqlalchemy_engine it might always be using postgresql rather than actually using the dialect argument that was passed in?

For example, this does not work for me:

from sqlalchemy.engine import Engine
from sqlalchemy.dialects import registry

from siuba.sql import LazyTbl
from siuba import *

# manually construct BigQuery engine following the pattern in mock_sqlalchemy_engine
dialect_cls = registry.load('bigquery')   
engine = Engine(None, dialect_cls(), '') 

test_tbl = LazyTbl(engine, "test_table", ["x", "y", "z", "calitp_itp_id"])
q = test_tbl >> mutate(res = _.x.isin([1,2])) >> show_query()

I get this error: AssertionError: Unexpected param: (1, 2) (which is the same error I get when pointing at our actual BigQuery data.) Full error collapsed below for readability.

Full AssertionError --------------------------------------------------------------------------- AssertionError Traceback (most recent call last) Input In [8], in () 9 engine = Engine(None, dialect_cls(), '') 11 test_tbl = LazyTbl(engine, "test_table", ["x", "y", "z", "calitp_itp_id"]) ---> 12 q = test_tbl >> mutate(res = _.x.isin([1,2])) >> show_query() File /opt/conda/lib/python3.9/site-packages/siuba/dply/verbs.py:99, in Pipeable.__rrshift__(self, x) 96 elif callable(x): 97 return Pipeable(calls = [x] + self.calls) ---> 99 return self(x) File /opt/conda/lib/python3.9/site-packages/siuba/dply/verbs.py:104, in Pipeable.__call__(self, x) 102 res = x 103 for f in self.calls: --> 104 res = f(res) 105 return res File /opt/conda/lib/python3.9/site-packages/siuba/siu.py:203, in Call.__call__(self, x) 201 return operator.getitem(inst, *rest) 202 elif self.func == "__call__": --> 203 return getattr(inst, self.func)(*rest, **kwargs) 205 # in normal case, get method to call, and then call it 206 f_op = getattr(operator, self.func) File /opt/conda/lib/python3.9/site-packages/siuba/dply/verbs.py:203, in singledispatch2..wrapper(*args, **kwargs) 200 if not args: 201 return dispatch_func(NoArgs(), **strip_kwargs) --> 203 return dispatch_func(*strip_args, **strip_kwargs) File /opt/conda/lib/python3.9/functools.py:888, in singledispatch..wrapper(*args, **kw) 884 if not args: 885 raise TypeError(f'{funcname} requires at least ' 886 '1 positional argument') --> 888 return dispatch(args[0].__class__)(*args, **kw) File /opt/conda/lib/python3.9/site-packages/siuba/sql/verbs.py:429, in _show_query(tbl, simplify) 426 print(compile_query()) 427 else: 428 # use a much more verbose query --> 429 print(compile_query()) 431 return tbl File /opt/conda/lib/python3.9/site-packages/siuba/sql/verbs.py:417, in _show_query..() 414 @show_query.register(LazyTbl) 415 def _show_query(tbl, simplify = False): 416 query = tbl.last_op #if not simplify else --> 417 compile_query = lambda: query.compile( 418 dialect = tbl.source.dialect, 419 compile_kwargs = {"literal_binds": True} 420 ) 423 if simplify: 424 # try to strip table names and labels where uneccessary 425 with use_simple_names(): File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:494, in ClauseElement.compile(self, bind, dialect, **kw) 489 url = util.preloaded.engine_url 490 dialect = url.URL.create( 491 self.stringify_dialect 492 ).get_dialect()() --> 494 return self._compiler(dialect, **kw) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:558, in ClauseElement._compiler(self, dialect, **kw) 554 def _compiler(self, dialect, **kw): 555 """Return a compiler appropriate for this ClauseElement, given a 556 Dialect.""" --> 558 return dialect.statement_compiler(dialect, self, **kw) File /opt/conda/lib/python3.9/site-packages/pybigquery/sqlalchemy_bigquery.py:222, in BigQueryCompiler.__init__(self, dialect, statement, *args, **kwargs) 220 if isinstance(statement, Column): 221 kwargs["compile_kwargs"] = util.immutabledict({"include_table": False}) --> 222 super(BigQueryCompiler, self).__init__(dialect, statement, *args, **kwargs) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:776, in SQLCompiler.__init__(self, dialect, statement, cache_key, column_keys, for_executemany, linting, **kwargs) 772 # a map which tracks "truncated" names based on 773 # dialect.label_length or dialect.max_identifier_length 774 self.truncated_names = {} --> 776 Compiled.__init__(self, dialect, statement, **kwargs) 778 if self.isinsert or self.isupdate or self.isdelete: 779 if statement._returning: File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:451, in Compiled.__init__(self, dialect, statement, schema_translate_map, render_schema_translate, compile_kwargs) 449 if self.can_execute: 450 self.execution_options = statement._execution_options --> 451 self.string = self.process(self.statement, **compile_kwargs) 453 if render_schema_translate: 454 self.string = self.preparer._render_schema_translates( 455 self.string, schema_translate_map 456 ) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:486, in Compiled.process(self, obj, **kwargs) 485 def process(self, obj, **kwargs): --> 486 return obj._compiler_dispatch(self, **kwargs) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py:82, in _generate_compiler_dispatch.._compiler_dispatch(self, visitor, **kw) 79 return visitor.visit_unsupported_compilation(self, err, **kw) 81 else: ---> 82 return meth(self, **kw) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:3343, in SQLCompiler.visit_select(self, select_stmt, asfrom, insert_into, fromhints, compound_index, select_wraps_for, lateral, from_linter, **kwargs) 3339 text += self.get_select_precolumns(select_stmt, **kwargs) 3340 # the actual list of columns to print in the SELECT column list. 3341 inner_columns = [ 3342 c -> 3343 for c in [ 3344 self._label_select_column( 3345 select_stmt, 3346 column, 3347 populate_result_map, 3348 asfrom, 3349 column_clause_args, 3350 name=name, 3351 proxy_name=proxy_name, 3352 fallback_label_name=fallback_label_name, 3353 column_is_repeated=repeated, 3354 need_column_expressions=need_column_expressions, 3355 ) 3356 for ( 3357 name, 3358 proxy_name, 3359 fallback_label_name, 3360 column, 3361 repeated, 3362 ) in compile_state.columns_plus_names 3363 ] 3364 if c is not None 3365 ] 3367 if populate_result_map and select_wraps_for is not None: 3368 # if this select was generated from translate_select, 3369 # rewrite the targeted columns in the result map 3371 translate = dict( 3372 zip( 3373 [ (...) 3393 ) 3394 ) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:3344, in (.0) 3339 text += self.get_select_precolumns(select_stmt, **kwargs) 3340 # the actual list of columns to print in the SELECT column list. 3341 inner_columns = [ 3342 c 3343 for c in [ -> 3344 self._label_select_column( 3345 select_stmt, 3346 column, 3347 populate_result_map, 3348 asfrom, 3349 column_clause_args, 3350 name=name, 3351 proxy_name=proxy_name, 3352 fallback_label_name=fallback_label_name, 3353 column_is_repeated=repeated, 3354 need_column_expressions=need_column_expressions, 3355 ) 3356 for ( 3357 name, 3358 proxy_name, 3359 fallback_label_name, 3360 column, 3361 repeated, 3362 ) in compile_state.columns_plus_names 3363 ] 3364 if c is not None 3365 ] 3367 if populate_result_map and select_wraps_for is not None: 3368 # if this select was generated from translate_select, 3369 # rewrite the targeted columns in the result map 3371 translate = dict( 3372 zip( 3373 [ (...) 3393 ) 3394 ) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:3175, in SQLCompiler._label_select_column(self, select, column, populate_result_map, asfrom, column_clause_args, name, proxy_name, fallback_label_name, within_columns_clause, column_is_repeated, need_column_expressions) 3169 result_expr = col_expr 3171 column_clause_args.update( 3172 within_columns_clause=within_columns_clause, 3173 add_to_result_map=add_to_result_map, 3174 ) -> 3175 return result_expr._compiler_dispatch(self, **column_clause_args) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py:82, in _generate_compiler_dispatch.._compiler_dispatch(self, visitor, **kw) 79 return visitor.visit_unsupported_compilation(self, err, **kw) 81 else: ---> 82 return meth(self, **kw) File /opt/conda/lib/python3.9/site-packages/pybigquery/sqlalchemy_bigquery.py:272, in BigQueryCompiler.visit_label(self, within_group_by, *args, **kwargs) 270 if within_group_by: 271 kwargs["render_label_as_label"] = args[0] --> 272 return super(BigQueryCompiler, self).visit_label(*args, **kwargs) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:1496, in SQLCompiler.visit_label(self, label, add_to_result_map, within_label_clause, within_columns_clause, render_label_as_label, result_map_targets, **kw) 1488 if add_to_result_map is not None: 1489 add_to_result_map( 1490 labelname, 1491 label.name, 1492 (label, labelname) + label._alt_names + result_map_targets, 1493 label.type, 1494 ) 1495 return ( -> 1496 label.element._compiler_dispatch( 1497 self, 1498 within_columns_clause=True, 1499 within_label_clause=True, 1500 **kw 1501 ) 1502 + OPERATORS[operators.as_] 1503 + self.preparer.format_label(label, labelname) 1504 ) 1505 elif render_label_only: 1506 return self.preparer.format_label(label, labelname) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py:82, in _generate_compiler_dispatch.._compiler_dispatch(self, visitor, **kw) 79 return visitor.visit_unsupported_compilation(self, err, **kw) 81 else: ---> 82 return meth(self, **kw) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:2190, in SQLCompiler.visit_binary(self, binary, override_operator, eager_grouping, from_linter, lateral_from_linter, **kw) 2188 disp = self._get_operator_dispatch(operator_, "binary", None) 2189 if disp: -> 2190 return disp(binary, operator_, **kw) 2191 else: 2192 try: File /opt/conda/lib/python3.9/site-packages/pybigquery/sqlalchemy_bigquery.py:313, in BigQueryCompiler.visit_in_op_binary(self, binary, operator_, **kw) 311 def visit_in_op_binary(self, binary, operator_, **kw): 312 return self.__in_expanding_bind( --> 313 self._generate_generic_binary(binary, " IN ", **kw) 314 ) File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/compiler.py:2256, in SQLCompiler._generate_generic_binary(self, binary, opstring, eager_grouping, **kw) 2249 kw["_in_binary"] = True 2250 kw["_binary_op"] = binary.operator 2251 text = ( 2252 binary.left._compiler_dispatch( 2253 self, eager_grouping=eager_grouping, **kw 2254 ) 2255 + opstring -> 2256 + binary.right._compiler_dispatch( 2257 self, eager_grouping=eager_grouping, **kw 2258 ) 2259 ) 2261 if _in_binary and eager_grouping: 2262 text = "(%s)" % text File /opt/conda/lib/python3.9/site-packages/sqlalchemy/sql/visitors.py:82, in _generate_compiler_dispatch.._compiler_dispatch(self, visitor, **kw) 79 return visitor.visit_unsupported_compilation(self, err, **kw) 81 else: ---> 82 return meth(self, **kw) File /opt/conda/lib/python3.9/site-packages/pybigquery/sqlalchemy_bigquery.py:437, in BigQueryCompiler.visit_bindparam(self, bindparam, within_columns_clause, literal_binds, skip_bind_expression, **kwargs) 434 assert_(param != "%s", f"Unexpected param: {param}") 436 if bindparam.expanding: --> 437 assert_(self.__expanded_param(param), f"Unexpected param: {param}") 438 param = param.replace(")", f":{bq_type})") 440 else: File /opt/conda/lib/python3.9/site-packages/pybigquery/sqlalchemy_bigquery.py:63, in assert_(cond, message) 61 def assert_(cond, message="Assertion failed"): # pragma: NO COVER 62 if not cond: ---> 63 raise AssertionError(message) AssertionError: Unexpected param: (1, 2)
machow commented 2 years ago

I get this error: AssertionError: Unexpected param: (1, 2)

Ah, I think it's likely an issue with whatever version of the sqlalchemy-bigquery dialect is installed (edit: I just logged into calitp's cluster, and I think it's pybigquery==0.10.2). They rebranded it to sqlalchemy-bigquery.

machow commented 2 years ago

Going to close, since the issue with .isin should resolve with upgrading, but let's re-open / open a new issue if the verb_dispatch usecase becomes relevant