apache / superset

Apache Superset is a Data Visualization and Data Exploration Platform
https://superset.apache.org/
Apache License 2.0
61.33k stars 13.36k forks source link

Create a PandasDatasource #3302

Closed rhunwicks closed 5 years ago

rhunwicks commented 7 years ago

Superset version

0.19.0

Expected results

There are a large number of Issues asking about adding new Datasources / Connectors:

  1. 381

  2. 2790

  3. 2468

  4. 945

  5. 241

  6. 600

  7. 245

Unfortunately, I can't find any examples of a working third party datasource / connector on Github and I think this is possibly because of the complexity and level of effort required to implement a BaseDatasource subclass with all the required methods. In particular, it needs to be able to report the schema and do filtering, grouping and aggregating.

Pandas has great import code, and I have seen Pandas proposed an a method for implementing a CSV connector - see #381 - read the CSV using Pandas and then output to sqlite and then connect to sqlite using the SQLA Datasource to create the slices.

This approach could be extended to other data formats that Pandas can read, e.g. Excel, HDF5, etc.

However, it is not ideal because the sqlite file will be potentially be out of date as soon as it is loaded.

I'd like to propose an altenative: a PandasDatasource that allows the user to specify the import method (read_csv, read_table, read_hdf, etc.) and a URL and which then queries the URL using the method to create a Dataframe. It reports the columns available and their types based on the dtypes for the Dataframe. And by default it allows grouping, filtering and aggregating using Pandas built in functionality.

I realize that this approach won't work for very large datasets that could overwhelm the memory of the server, but it would work for my use case and probably for many others. The results of the read, filter, group and aggregate would be cached anyway, so the large memory usage is potentially only temporary.

This would also make it very much easier for people working with larger datasets to create a custom connector to suit their purposes. For example, someone wanting to use BigQuery (see #945) could extend the PandasDatasource to use read_gbq and to pass the filter options through to BigQuery but still rely on Pandas to do grouping and aggregating. Given that starting point, someone else might come along later and add the additional code necessary to pass some group options through to BigQuery.

The point is that instead of having to write an entire Datasource and implement all methods, you could extend an existing one to scratch your particular itch, and over time as more itches get scratched we would end up with a much broader selection of datasources for Superset.

xrmx commented 7 years ago

There's Druid and SqlAlchemy connectors in the source code and a PR for ElasticSearch. The idea is sound, looking forward to a PR.

mistercrunch commented 7 years ago

So we want the web server(s) to be stateless (Superset isn't a database!), that probably means that at each web request PandasConnector would have to read the raw data, filter/aggregate/compute, and flush the results so they can be garbage collected.

Seems pretty inefficient, why not flush your dataframe to a proper sql-speaking database an using the sqla connector.

An Apache Arrow connector seems like a more sensical idea though I'm unclear on the specifics.

rhunwicks commented 7 years ago

@mistercrunch I agree that we want the web server(s) to be stateless.

Our use case is that we want to read data from a REST API that serves up (among other things), csv-formatted data. I can't just flush the dataframe to a SQLA database as a one-off exercise because I need the slice to be able to show up to date information (bearing in mind the cache timeout on the slice).

I'm new to Superset, so I may have misunderstood the processing - in which case, please correct me...

There seem to be two main types of web request going on:

  1. requests from the editors (slices or SQL Lab) while an analyst is designing a new slice.
  2. requests from the slices when they are being visualized

In the event that you have a SQLA connector, then each web request involves creating a connection to the DB (unless pooled), executing a query which does the read/filter/aggregate/compute and returns the data. I.e. the work to be done is the same, but in the SQLA case we are doing it on the database server, whereas I was proposing doing it on the web server. I recognize that doing it on the database server is sensible for most use cases.

My understanding of how Superset works is that once the slices have been designed, then the results of the query can be cached. I.e. once a slice has been published that does its filter/aggregate/compute in Pandas, it would only be doing it once per cache expiry period, because the result of that computation would be stored in the cache and each time the slice is viewed the result would be returned directly from cache.

I did think of an alternative approach where PandasConnector is a subclass of SqlaTable with extra fields/columns for the remote URL and the Pandas read method. When we define the "table", we pull the file/url, read it using read_csv or whatever method has been designated, and then save it to the designated table name using Dataframe.to_sql. Then we can allow all the functionality inherited from `SqlaTable to be used for filtering, grouping, etc.

If we take this approach, then each time we need to read the data from SQLA table, we need to first fetch the up to date data from the remote URL, read it into Pandas, drop the existing SQL table, write a new table from the Pandas DataFrame, then execute the query. As far as I can tell, executing the query involves reading the results back into a Pandas Dataframe using pd.read_sql. This also seems inefficient, and in many cases it will be slower that the pure Pandas approach, because the cost of writing the table to the database just in order to execute the query and then read it back could well be slower than the time taken to just do the filter/group/aggregate in Pandas. However, the SqlaTable could be in a SQLite :memory: database, or a "temporary" database (memory with overflow to disk) which might speed things a little.

mistercrunch commented 7 years ago

I'm guessing your REST API is backend by some sort of database, couldn't you just access that?

Alternatively, if you don't own that database and want to run analytical workloads off of it, shouldn't you just write a data pipeline to bring that data over into a local datastore?

How would the owner of that service feel when front-loading the cache in the morning? You'd be potentially hitting that REST API pretty hard.

The issue with the Pandas approach is you can't "pushdown" the filtering and/or aggregations, meaning the web server has to do all that heavy lifting. It's true that things would get cached, but any slice-dicing beyond the cache would fall off a perf cliff.

There's no reason why it wouldn't work for small datasets though, though I doubt is would be very useful to others.

rhunwicks commented 7 years ago

@mistercrunch you're right that the REST API is backed by a database, but although it is ours and we control it, the data comes from a third party product and so we don't control the schema. The data is largely stored in a monolithic table which contains a JSONB column that contains the data of real interest. The fields in the JSONB column vary in a NoSQL fashion. The REST API extracts this data and presents it in a recognizable tabular form to the analysts.

I don't think I could give the analysts access to the underlying table using Superset because it would be too hard for them to write the SQL to extract the data values that they want from the JSONB column.

We have considered creating views on the monolithic table that provide the correct view of the data, and we could automate that step. But if we give analysts direct access to those views then we loose all the fine-grained permissions that are enforced by the REST API and we would have to duplicate the permissions model in Postgresql.

We think we have a design for pushing down the filtering to our specific REST API in a subclass of a generic PandasConnector. I can also imagine how some other specific implementations might be able to push down grouping and aggregating.

At the moment though, any custom connector has to be prepared to implement the whole BaseDatasource API and that will be too hard and/or expensive for many people. I was hoping for a generic connector that could be used as a starting point for those want to connect to niche data sources. A generic connector that uses Pandas to read and then a local SQLA database as a interim data store would also work.

I accept that my use case with small data volumes from a REST API is not the main target audience for Superset and it might not be that valuable to others.

mistercrunch commented 7 years ago

If you want to build it and get to a point where it works well, we could add it to a contrib folder where the core team wouldn't really support it but it could ship along with Superset.

rhunwicks commented 6 years ago

@mistercrunch please can you some provide some guidance on how we would handle migrations in a contrib module, and/or lay out the directory structure.

For example, if we have

project_root/
    contrib/
        connectors/
            pandas
        migrations/
    superset

Then we need a way to tell Superset to include those migrations when we migrate the db. We can do that by changing the value of version_locations in alembic.ini but do we need to make that change in superset/migrations/alembic.ini? Or do you want a separate alembic.ini in the contrib/migrations folder.

A separate one keeps a loose coupling between the contrib folder and superset. But it adds more complexity to the setup and management.

rhunwicks commented 6 years ago

@mistercrunch please can you also provide some guidance on how we would handle changes to the JavaScript client? Specifically, we would need to change (at a minimum) assets/javascripts/explore/stores/visTypes.js in order to set up the correct sectionsToRender based on the new datasourceType.

A solution where the DataSource registers which sections apply and they are passed to the client in bootstrap_data from the view seems more appropriate than amending visTypes.js directly (because it creates a dependency between superset and contrib that you might not want). That change to the superset core is something we wouldn't be comfortable designing or building.

I think that these issues (both setting up the JavaScript client and adding new migrations) will apply to most other custom connectors, so if we want to encourage development of a wider range of connectors then we will probably need to solve them.

xrmx commented 6 years ago

I really don't see how having migrations for the superset database separated from the upstream one can work. Any chance you make your code public so we can take a look to what is needed?

rhunwicks commented 6 years ago

@xrmx the code is still very rough - I haven't pushed it to GitHub yet - partially because I want to make sure the directory structure matches your expectations in order to get it merged!

I am also not planning to push it to GitHub until I have an actual working implementation that can do filtering and grouping, and we haven't got that far yet.

However, we need to store metadata about the DataSources and the associated Columns and Metrics. The required metadata is very similar to that required for SqlaTable datasources, but not identical.

So the migration just adds three new tables. Currently the migration looks like:

"""Add PandasDatasource

Revision ID: b2cd059e8803
Revises: ca69c70ec99b
Create Date: 2017-08-30 09:33:40.431377

"""

# revision identifiers, used by Alembic.
revision = 'b2cd059e8803'
down_revision = 'ca69c70ec99b'

from alembic import op
import sqlalchemy as sa
import sqlalchemy_utils

FORMATS = [
    ('csv', 'CSV'),
    ('html', 'HTML')
]

def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('pandasdatasources',
    sa.Column('created_on', sa.DateTime(), nullable=True),
    sa.Column('changed_on', sa.DateTime(), nullable=True),
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('description', sa.Text(), nullable=True),
    sa.Column('default_endpoint', sa.Text(), nullable=True),
    sa.Column('is_featured', sa.Boolean(), nullable=True),
    sa.Column('filter_select_enabled', sa.Boolean(), nullable=True),
    sa.Column('offset', sa.Integer(), nullable=True),
    sa.Column('cache_timeout', sa.Integer(), nullable=True),
    sa.Column('params', sa.String(length=1000), nullable=True),
    sa.Column('perm', sa.String(length=1000), nullable=True),
    sa.Column('name', sa.String(length=100), nullable=False),
    sa.Column('source_url', sa.String(length=1000), nullable=False),
    sa.Column('format', sqlalchemy_utils.types.choice.ChoiceType(FORMATS), nullable=False),
    sa.Column('additional_parameters', sqlalchemy_utils.types.json.JSONType(), nullable=True),
    sa.Column('user_id', sa.Integer(), nullable=True),
    sa.Column('fetch_values_predicate', sa.String(length=1000), nullable=True),
    sa.Column('main_dttm_col', sa.String(length=250), nullable=True),
    sa.Column('created_by_fk', sa.Integer(), nullable=True),
    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('pandascolumns',
    sa.Column('created_on', sa.DateTime(), nullable=True),
    sa.Column('changed_on', sa.DateTime(), nullable=True),
    sa.Column('column_name', sa.String(length=255), nullable=True),
    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
    sa.Column('is_active', sa.Boolean(), nullable=True),
    sa.Column('type', sa.String(length=32), nullable=True),
    sa.Column('groupby', sa.Boolean(), nullable=True),
    sa.Column('count_distinct', sa.Boolean(), nullable=True),
    sa.Column('sum', sa.Boolean(), nullable=True),
    sa.Column('avg', sa.Boolean(), nullable=True),
    sa.Column('max', sa.Boolean(), nullable=True),
    sa.Column('min', sa.Boolean(), nullable=True),
    sa.Column('filterable', sa.Boolean(), nullable=True),
    sa.Column('description', sa.Text(), nullable=True),
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('pandasdatasource_id', sa.Integer(), nullable=True),
    sa.Column('created_by_fk', sa.Integer(), nullable=True),
    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['pandasdatasource_id'], ['pandasdatasources.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_table('pandasmetrics',
    sa.Column('created_on', sa.DateTime(), nullable=True),
    sa.Column('changed_on', sa.DateTime(), nullable=True),
    sa.Column('metric_name', sa.String(length=512), nullable=True),
    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
    sa.Column('metric_type', sa.String(length=32), nullable=True),
    sa.Column('description', sa.Text(), nullable=True),
    sa.Column('is_restricted', sa.Boolean(), nullable=True),
    sa.Column('d3format', sa.String(length=128), nullable=True),
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('pandasdatasource_id', sa.Integer(), nullable=True),
    sa.Column('source', sa.Text(), nullable=True),
    sa.Column('expression', sa.Text(), nullable=True),
    sa.Column('created_by_fk', sa.Integer(), nullable=True),
    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
    sa.ForeignKeyConstraint(['pandasdatasource_id'], ['pandasdatasources.id'], ),
    sa.PrimaryKeyConstraint('id')
    )
    # ### end Alembic commands ###

def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table('pandasmetrics')
    op.drop_table('pandascolumns')
    op.drop_table('pandasdatasources')
    # ### end Alembic commands ###
rhunwicks commented 6 years ago

@xrmx said:

don't see how having migrations for the superset database separated from the upstream one can work

In theory, we could change the superset/migrations/env.py to read the value of version_locations from current_app.config the same way it reads sqlalchemy.url. Then anyone wanting to deploy a homegrown connector could add a new directory alongside superset:

superset/
    migrations/
xmymodule/
    connectors/
    migrations/
        versions/

And then they could update their local superset_config.py to both load the custom connector and update the version locations:

# Include additional data sources
ADDITIONAL_MODULE_DS_MAP = {
    'xmymodule.connectors.pandas.models': ['PandasDatasource'],
}
# And their migrations
ADDITIONAL_VERSION_LOCATIONS = ['xmymodule/migrations/versions']

Possibly you want to specify the version locations as modules rather than directories:

ADDITIONAL_VERSION_LOCATIONS = ['xmymodule.migrations.versions']

It would create an extension point that would allow people to add new functionality to Superset that requires migrations, without needing them to maintain a fork. And it would make it easier for other people to choose to deploy those extensions in their Superset installation without having to try and merge in code to their own fork - they could just deploy Superset and git clone in the repos for the extension(s) they want.

rhunwicks commented 6 years ago

For reference, the PR for the Elasticsearch connector is at https://github.com/apache/incubator-superset/pull/2953/files

rhunwicks commented 6 years ago

I think you could enable an ADDITIONAL_VERSION_LOCATIONS setting in superset_config.py by adding the following code to superset/cli.py

migrate = app.extensions['migrate'].migrate

@migrate.configure
def configure_version_locations(migrate_config):
    # Get additional migration locations
    locations = config.get('ADDITIONAL_VERSION_LOCATIONS', [])
    # Include the main superset migrations
    locations.append('%(here)s/versions')
    migrate_config.set_main_option('version_locations', ' '.join(locations))
    return migrate_config
xrmx commented 6 years ago

@rhunwicks my worry is that contrib migrations may have references to upstream models that will change in the future. So when applied to a different time they may break.

rhunwicks commented 6 years ago

@xrmx it's true that if someone developed a model that had a dependency on a core Superset model, and then that model changed significantly, then the extension might stop working. But that would be the case whether or not the dependent model was in the core superset/migrations/versions directory or not. The impression I have from @mistercrunch comments above was that if we can write a PandasConnector it would be non-core anyway. And so given that objective it makes sense to try and keep the extension as loosely coupled as possible.

Alembic, and thus Flask-Migrate, is quite capable of dealing with a branched migration tree, so changes can be made to the main models in the main tree without affecting the extension migrations provided that either they are not dependent on any upstream models or the upstream models don't change in a breaking way. For example, our models have a foreign key to ab_user but as long as changes to that model don't alter the primary key I don't think it will make any difference.

If someone wrote an extension that did stop working as a result of change to an upstream model then they would need to update their component to account for the changes in order to continue working with the most recent version of Superset. This is a normal workflow for an add on component for an open source project.

On balance, I think that it is preferable to have an ADDITIONAL_VERSION_LOCATIONS config parameter that would allow me to (attempt to) develop a PandasConnector that is minimally invasive to core Superset. I think that smaller the impact it has on core Superset the better chance it has of being successful. I think that the same logic might apply to other potential extensions - as I described at the start of this Issue there have been many requests for additional connectors, and making it easier for me and others to develop them without needing changes to core Superset or maintain a fork would likely increase the availability of such connectors.

However, my main objective is to write a connector that can work with core Superset on an ongoing basis. So if the core team decide that Connectors should live in a contrib directory and keep their migrations in the main superset/migrations/versions directory then I'll happily comply.

rhunwicks commented 6 years ago

One approach that might be relevant for writing custom connectors is https://github.com/yhat/pandasql.

That project allows you to use SQLite-compatible SQL to query a Pandas Dataframe. It does it (AFAIK) by automatically serializing the dataframe to a temporary SQLite DB and then executing the query against it and reading back the result.

In theory, that would allow us to create a new connector that inherited from SqlaTable and forced the database dialect to SQLite and then the existing query() method could be reused entirely. I don't know how functional it would be, but in theory if it serializes to a SQLite table and then executes the query using the real engine, then any valid SQLite query should work.

I am not planning on using that approach myself, because I think that the overhead of serializing and then deserializing the rows will be too much, but it does solve some of the other problems, and so I am mentioning it here in case it is relevant for anyone else.

xrmx commented 6 years ago

@rhunwicks Thanks for the thoroughly analysis

I'd really hope we can have a pandas connector as first class citizen. Imagine how useful that would be for not tech savvy organizations where they can just upload a csv or an excel and explore that.

rhunwicks commented 6 years ago

@mistercrunch when is the order of records returned by the query function significant?

Looking at SqlaTable.query(), I think that:

Is it important that when is_timeseries == True the rows are returned in __timestamp order?

I assume not, because then we have both is_timeseries and groupby then SqlaTable still returns the rows by main_metrics_expr in descending order, rather than by either __timestamp or groupby?

rhunwicks commented 6 years ago

@mistercrunch the Travis-CI run for the PR fails because it doesn't run the migration for PandasConnector, but I can't fix that until I know whether you want to:

  1. Keep the migration in the contrib directory and update cli.py to allow multiple migration locations
  2. Keep the migration in the main superset/migration/versions directory - which simplifies running and maintaining the migrations but will create a tighter link between PandasConnector and the main code.

Please can you look at the PR when you get a chance and let me know which way you want me to set it up?

rhunwicks commented 6 years ago

Is it important that when is_timeseries == True the rows are returned in __timestamp order?

I think not, because it looks like 816c517 has made is_timeseries without a groupby also use main_metric_expr for ordering.

I'll update the tests in the PR to account for that.

rhunwicks commented 6 years ago

@mistercrunch I have added some docs to the PR for a Pandas Connector, and fixed it up to pass the flake8 test in tox.ini.

I'd appreciate some guidance on whether you think this will make it into the Superset repo at some point, because of the effort required to keep it in a ready to merge state (e.g. fixing merge conflicts in setup.py, updating the migration references when a new migration hits master, etc.). If it won't be merged then we will split it into a separate repo, but as discussed above it would be preferable if we could make some minor changes to Superset to allow us to configure it to find and run the additional migration.

We are using Pandas Connector on our deployment, and for us at least it is much more useful than the Import CSV functionality merged in #3643 because it allows Superset dashboards to include current data from remote files or APIs without someone having to manually upload a new version of the CSV.

jevanio commented 6 years ago

@rhunwicks I want to do a POC about this PandasDatasource but I'm not sure how to connect superset with a REST API that already get data from a db and add it to a pandas dataframe, can you help me?

xrmx commented 6 years ago

@jevanio unrelated to this PR please use the mailing list or gitter.

rhunwicks commented 6 years ago

Having the migrations and requirements mixed in with the main Superset ones without ever merging this MR causes frequent merge conflicts. Therefore, I have closed MR #3492. We will continue to maintain the connector in order to allow us to use Superset with APIs and remote files, but we will do so in our fork rather than a MR.

If Superset publishes a specification for developing third party connectors in future then we will repackage our connector according to the specification.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. For admin, please label this issue .pinned to prevent stale bot from closing the issue.