databricks / koalas

Koalas: pandas API on Apache Spark
Apache License 2.0
3.34k stars 357 forks source link

Allow querying DataFrame directly in sql method #285

Closed floscha closed 5 years ago

floscha commented 5 years ago

While I really like the idea of @rxin's recent #256 PR, he uses an (in my opinion) over-simplistic example of ks.sql("select * from range(10) where id > 7"). I believe that the ability to query actual Koalas DataFrames through SQL can prove really valuable to many users. However, when trying to use ks.sql with a Koalas DataFrame, the following exception occurs:

kdf = ks.DataFrame({'A': [1,2,3]})
>>> ks.sql("select * from kdf")
...
org.apache.spark.sql.AnalysisException: Table or view not found: kdf; line 1 pos 14
...

This is not surprising to someone with PySpark knowledge who knows that kdf has to be registered as a temporary table before being able to use it with SparkSQL. Unfortunately, (as I understand it) the target group of the Koalas library should not be expected to be Spark experts. To get the above example working, the following workaround is needed, which requires the usage of the lower-level (Py)Spark API, thus somewhat defeating the purpose of Koalas.

>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext
>>> sc = SparkContext.getOrCreate()
>>> sql = SQLContext(sc)
>>> sql.registerDataFrameAsTable(kdf._sdf, "kdf")
>>> ks.sql("select * from kdf")
   __index_level_0__  A                                                         
0                  0  1
1                  1  2
2                  2  3
# Optionally clean-up by dropping the temporary table
>>> sql.dropTempTable("kdf")

Wouldn't it be much more convenient if this "temporary table magic" would instead be handled by Koalas behind the scenes or are there any design objections against such an approach?

HyukjinKwon commented 5 years ago

I think we shouldn't mix PySpark APIs and Koalas APIs. Maybe we should add a method called to_spark(..) and delegate everything to there.

As you said, I think ks.sql's purpose is mainly to generate some data or read data from Spark source only. We maybe should document this limitation clearly.

So, when they want to switch to PySpark context, they should explicitly call to_spark(...). Once we start to mix both, it might be difficult for Koalas to target its original goal, Pandas corresponding APIs.

floscha commented 5 years ago

I see the point of not mixing the two libraries' APIs. Then again, how does ks.sql relate to the pandas API where no pandas.sql() exists?

Also, the mentioned to_spark() method already exists but I don't quite understand how it relates to the ks.sql method.

HyukjinKwon commented 5 years ago

how does ks.sql relate to the pandas API where no pandas.sql() exists?

I was thinking adding some APIs non-existent in Pandas (with some strong reasons) might make sense (see https://github.com/databricks/koalas#unify-small-data-pandas-api-and-big-data-spark-api-but-pandas-first); however, if that brings a bunch of compatibility issues, I think we should restrict its usage rather than putting efforts to extend PySpark's API usage.

Also, the mentioned to_spark() method already exists but I don't quite understand how it relates to the ks.sql method.

For instance, sql.registerDataFrameAsTable(kdf.to_spark(), "kdf") will work instead. We could additionally consider like a flag that drops index simply because PySpark users are not familiar with it anyway.

thunterdb commented 5 years ago

@floscha it is interesting that you mention that topic now. I was looking into how to port pandasql to koalas, taking into account python's normal syntax for string interpolation. As an example, this statement:

date = pd.Timestamp('2012-05-01')
ids = [1,2,3]
kdf = ks.read_csv(...)
filtered_kdf = ks.sql(""" SELECT m.field1, m.field2 FROM {filtered_kdf}  m WHERE m.date >= {date} and m.ids in {ids} """)

This is equivalent to:

kdf.asDF().registerTempView("_koalas_tmp_1")
filtered_kdf = ks.sql(""" SELECT m.field1, m.field2 FROM _koalas_tmp_1  m WHERE m.date >= '2012-...' and m.ids in (1,2,3) """)

The variables are not necessary as a start, but they are very convenient because I see a lot of error when doing string interpolation in general. The pandasql uses a regex that seems pretty good to get started with to extract the variables.

This syntax has the advantage that it is safe from injection attacks and it is familiar to any python user (it is the same as string.format()).

What are the thoughts about this design?

thunterdb commented 5 years ago

I forgot the link: https://github.com/yhat/pandasql They choose to not up brackets. The advantage of using brackets is that any python user will be familiar with the format, the IDE will probably have autocomplete (I see that github make the proper highlights) and it allows the interpolations of arbitrary values.

floscha commented 5 years ago

Hey @thunterdb, I also had pandasql in the back of my head but wasn't quite sure whether or not you consider it to be "actual" pandas. Injecting values from variables is definitely a cool feature to have but I think first of all there needs to be a decision if the functionality to run SQL queries should be part of Koalas or rather something external like KoalaSQL 😉

@HyukjinKwon I don't really see compatibility issues since there is no pandas.sql() method that could cause name conflicts. On the other hand, as soon as users start creating Spark/SQLContexts and temporary tables, I believe they are comfortable enough to use PySpark instead of Koalas anyway. So having code like sql.registerDataFrameAsTable(kdf.to_spark(), "kdf") seems quite counter-intuitive to me.

HyukjinKwon commented 5 years ago

@floscha, what I mean by compatibility is between Koalas DataFrame and Spark DataFrame. Now we have to consider compatibility with Spark SQL <> Koalas DataFrame as well.

On the other hand, as soon as users start creating Spark/SQLContexts and temporary tables, I believe they are comfortable enough to use PySpark instead of Koalas anyway.

Yes, in that case, we should rather avoid to add Spark related APIs since we target users who want to use something different from PySpark. Proper fence is needed to distinguish sometimes.

floscha commented 5 years ago

@HyukjinKwon, what compatibility between Spark SQL and Koalas DataFrame do you mean? The project already has several dependencies on Spark SQL that require compatibility in some sense.

And again, ks.sql() is already a method that has no equivalent in pandas but rather directly translates to PySpark's DataFrame.sql() method. Since it defines itself as Execute a SQL query and return the result as a Koalas DataFrame., I'd say the only two consistent options would be to also allow using Koalas DataFrames in such SQL queries or to drop the method altogether.

Maybe @rxin also has an opinion on this since he introduced the ks.sql() method in #256.

rxin commented 5 years ago

Great discussion. Here are my thoughts: my vision is that there is a nontrivial chance Koalas becomes the de facto standard "big data science" API over time.

That is, when data scientists work with big data, they use Koalas. They know it runs on Spark (or maybe even some other thing in the future), but they don't necessarily know the PySpark API because that's all they have learned (when they transition from pandas over to Koalas).

To accomplish that, it's necessary to make sure from a functionality point of view, Koalas API is a superset of PySpark's (at least when it comes to the structured APIs; I'd not consider RDD or MLlib here).

A lot of production data sits in data lakes (S3, Azure blob store, HDFS, etc). If a data scientist wants to work with some production data, the first thing he/she will likely do is to directly load in that path, table, or run a SQL query. In this context, I think it's necessary to have the "sql" function.

Similarly, in order to work with big data, some additional functionalities that are unique to PySpark but not pandas are critical, e.g. "cache", "repartition", even though some data scientists will never use that.

One thing you could argue is that since Koalas runs on top of Spark, and it is trivial to convert between Spark DataFrames to Koalas DataFrames, why not just leave all the Spark specific functions in Spark and have users fall back to those?

To answer that question, we should consider the journey of data scientists. Let's use "Alice" as an example:

  1. Alice learns how to do data science using pandas from MOOCs
  2. Alice works on some real world data science problems with pandas
  3. Alice needs to work with some large production dataset in S3. Before Koalas (is mature/popular), she learns that PySpark is the thing to use and she now needs to learn a brand new API, which is daunting. After Koalas (is mature/popular), Alice reads some quick start guide on Koalas and then can immediately start applying most of what she learned from pandas to Koalas.
  4. Alice runs into some trouble that are specific to big data. For example, some operation that used to be super fast on tiny amount of data is now super slow on large amount of data. She reads up on how to address those issues. Hypothetically speaking, let's say Alice's more senior coworker tells her she just needs to cache the data to make the workload go faster.

At this point, which one do you think is easier for Alice?

Option 1. Go research and figure out how to do cache. Alice hasn't really been exposed to any of the Spark API yet, so even reading up the Spark API reference doc is daunting.

Option 2. Alice tries to do df.ca and tab autocomplete suggests "cache", which seems like what she wants to do. She confirms that by looking at the reference doc.

My theory is that it would be significantly easier for Alice and other data scientists to just learn from one place and learn one API. And this doesn't stop at just performance optimizations. Accessing data, plotting, ...

Along that direction, I do think it will be super convenient for data scientists to just run SQL queries in Koalas against Koalas DataFrames, without having to learn about registerTempView. After all, the main objective of the Koalas API should be to reduce friction and make data scientists more productive.

tdliu commented 5 years ago

Those are really good points... In any case, making the overall goal clear is important. Summarizing the discussion: we can go in two directions.

  1. Make Koalas mirror pandas as much as possible 1:1. In this scenario, we only implement the gaps in making pandas work on big data. Users familiar with Python DS ecosystem will be able to migrate single node workloads with minimal changes. Also crucially, they can migrate work back to single node trivially. However as popular as pandas is, it still has its quirks. It is not trivial for SQL or R users to pick up.
  2. Build the standard big data API, where we augment pandas. We implement pandas' methods and are careful not to deviate. As we've seen from PySpark, slight differences can really trip up users. Instead of just copying pandas, however, we are user-driven. There are many more users who know SQL than pandas for instance. Combining the two could be extremely powerful - more powerful than pandas. The tradeoff is that moving from pandas to Koalas is no longer bidirectional.

I think the bidirectional use case is actually really important. Within an organization, there are many patterns for working with big data. For example, many developers still have a strong preference for sampling data and developing locally. If Koalas and pandas are not 1:1, there could be issues with handoffs.

rxin commented 5 years ago

@tdliu they could run Koalas locally with Spark though :) No setup needed.

rxin commented 5 years ago

BTW on this specific function ... I can think of three ways:

  1. Be more "magical" and just automatically import all locals() and globals() DataFrames, and create temporary views for them. This would be by far the most convenient and fairly error prone. So the following will work out of the box:
my_range = ks.range(1000)
ks.sql("select count(*) from my_range")
  1. Similar to pandasql, require explicitly passing in the list of DataFrames. The above will now look like this:
my_range = ks.range(1000)
ks.sql("select count(*) from my_range", locals())

Users can accomplish option 1 by defining a method:

sql = lambda q: ks.sql(q, globals())
  1. Use the string interpolation @thunterdb outlined above, e.g.
my_range = ks.range(1000)
ks.sql("select count(*) from {my_range}")
floscha commented 5 years ago

Hi @rxin, thanks for your extensive thoughts on the topic. Here is my opinion on the suggested approaches:

  1. I would definitely prefer this version of the ks.sql() API since it directly translates to common SQL which I assume is what users would expect. Also, it would be consistent with Spark SQL's sql() method. However, I strongly dislike creating temporary views for all DataFrames out of which the majority might never be used. While this would work for a first proof of concept, in the long term I think it would be cleaner to scan the query for potential DataFrame names (e.g. whatever comes after the from statement) and only create temporary views for those DataFrames. Additionally, the use of locals() and globals(), which would be necessary in this case, can quickly cause problems once users split their code across several Python modules.
  2. Personally, that's the option I like the least since it combines a somewhat awkward API (does the majority of users even understand what locals() is?) with the same technical issues as above by potentially creating temporary views for all DataFrames. Also, the lambda statement only makes it more complicated in my opinion (remember the case where analysts simply want to run a SQL query instead of getting into software development).
  3. What I like about this approach is that it seems the cleanest from a technical perspective. However, it introduces a kind of new SQL dialect which (while easy to learn) would surely confuse most users at first. Also, as described above, I feel like the same could be achieved by parsing the query to find the involved DataFrame names without explicitly marking them with curly brackets.
floscha commented 5 years ago

On the more general vision of Koalas:

icexelloss commented 5 years ago

Great discussion here. I just want to chime in a little on the high level question "Should koalas support sql query" here?

When looking at the question with my engineer hat on, my instinct was "No" because sql functionality is not core pandas API. However, with my PM hats on, my train of thoughts are: Can a pandas user do sql with pandas DataFrame? The answer is yes (with pandassql). Although it's not part of Pandas core API, from a user's point of view, it's still part of "Pandas". Then I think a user should do sql with Koaloas DataFrame as well.

So I am leaning towards the answer "Yes"

rxin commented 5 years ago

@floscha "However, I strongly dislike creating temporary views for all DataFrames out of which the majority might never be used"

I don't know if you and I were on the same page. Here's what I had in mind, which I think is strictly a superset of your current implementation, with very little work ...

def sql(query):
    dfs = locals() + globals()
    views = []
    for each (name, obj) in dfs:
        if isinstance(obj, DataFrame):
            obj.createTempView(name)
            views.add(name)

    try:
        default_session().sql("...")
    finally:
        for name in views:
            default_session().dropView(name)

This way - we don't need to build a full SQL parser outside Spark, which would be difficult to keep up to date and consistent with Spark's.

rxin commented 5 years ago

I changed the ticket name to reflect the discussion more.

floscha commented 5 years ago

@rxin I totally agree with you that building our own SQL parser is a bad idea for many reasons (too error-prone, implementing most things twice, potentially different behavior than the one from Spark, etc.) However, what I don't like too much about your implementation is the overhead that is created by registering temporary views for absolutely all DataFrames in memory which makes this approach kind of brute-force-ish. Especially for cases like ks.sql("select * from range(10) where id > 7") where no temporary tables are required. Then again, maybe the overhead is still small enough to not actually matter.

rxin commented 5 years ago

The overhead is pretty small actually (just adding to some in memory hashmap in Spark). What we could do to make it even less is to tokenize the sql string and only register a DataFrame if there is a token and a global variable that matches.

I also know @thunterdb prefers wrapping the table name with square brackets...

floscha commented 5 years ago

I've removed the "SQL parser" now for #291 and replaced it with the suggested tokens.

floscha commented 5 years ago

One potential issue I was wondering about: If the user runs a query on a DataFrame for which a temporary view has already been created, ks.sql() will crash. However, this can only be achieved by bypassing Koalas and creating the view using the PySpark API. Therefore, I'm wondering if it's worth explicitly handling such a case or rather expect the user to not mess things up 😉

rxin commented 5 years ago

Good point. We probably need to restore those after we are done.

Maybe we should do what @thunterdb wanted - wrap stuff around square brackets and then everything is clear ...

thunterdb commented 5 years ago

Great comments here. My main issues with the 'magic' approaches (1 and 2, mapping straight the names of tables to variables) is that as you are finding, they are a few cases of ambiguity:

If you take the statement SELECT m.col FROM my_table m

In SQL / Spark, my_table is referring to a registered table.

In pandasql, a new DB is created all the time in sqlite when evaluating sql statement, so my_table is referring to a variable in the global or local scope.

With the option 1/2, there is an ambiguity because it could either be a variable or an existing table. That means that just renaming a variable or making a name change spark_df.name("my_table") can have an impact much further in code, which breaks all sorts of composition rules and gives me shivers for trying to debug that.

Another advantage I see with curly braces / string interpolation style is that you can inject eventually all sorts of expressions into it. Selecting from a column would be valid for example:

SELECT * FROM {kdf.col + 3} or SELECT m.col1, {my_table.col2} FROM {my_table}

Where it gets very interesting is allowing users to control the representation in the SQL statement for arbitrary objects. For example, injecting scalar variables is trivial to support: SELECT * FROM {my_table} m WHERE m.col > {value} where value can be a normal primitive. Now, value needs not be a primitive, and could be an arbitrary object. Following the python convention, if this object has a magic method that knows how to represent the object, then developers can make all their objects SQL-friendly. For example:

class MyTimeValue(object):
  ...
  def _sql_repr_(self):
    return "'2019-01-02'" # <- no escaping issue, quotes already added!

value = MyTimeValue("2019-01-03")
kdf = ks.sql("SELECT * FROM {my_table} m WHERE m.col > {value}")

Of course, the implementation is more tricky and this is more long term aspirations, but the semantics would be unambiguous and require no change in the user-facing API to widen the use cases.

@floscha @rxin any thoughts?

thunterdb commented 5 years ago

Also, regarding the parsing of the {}, we can simply reuse the parser from cpython, see this example: https://www.programcreek.com/python/example/98516/_string.formatter_parser

rxin commented 5 years ago

Yes - i feel we should just support {my_df} for now, since it's a safe choice, and see what type of feedback we get. If everybody tells us to support this without the curly braces, we can change it in the future.

rxin commented 5 years ago

@floscha want to update your existing PR to do that (interpolation with curly braces)? Alternatively you can close this one and open a new one, just in case we want to change to a different design in the future (then we will have easy access to your old code).

thunterdb commented 5 years ago

@floscha I took upon me to take your code and make the change to curly braces in #360 . Can you comment there and tell me what you think?