h2oai / datatable

A Python package for manipulating 2-dimensional tabular data structures
https://datatable.readthedocs.io
Mozilla Public License 2.0
1.81k stars 156 forks source link

Aggregator in datatable #1077

Closed lelandwilkinson closed 6 years ago

lelandwilkinson commented 6 years ago

See Java code in https://github.com/h2oai/vis-data-server/blob/master/library/src/main/java/com/h2o/data/Aggregator.java

Plus other classes in that package for support. All of this should be done in C++

nikhilshekhar commented 6 years ago

@st-pasha @oleksiyskononenko any thoughts on how to go about this. We will have to decide on an interface of how the datatable aggregator interacts with tha java visual server.

oleksiyskononenko commented 6 years ago

@nikhilshekhar The idea is to implement the Aggregator in C++ in datatable and make it accessible from Python. Sure, we can discuss the interaction with the Java Visual Server.

oleksiyskononenko commented 6 years ago

Here is a preliminary list of functions that are going to be implemented in datatable:

Exposed function (Python):

Internal functions (C++):

st-pasha commented 6 years ago

Looks good, except the output should be the original frame grouped into "observation clusters".

We will also add convenience functions to extract the [exemplar row + nobs] from the grouped frame, as well as the list of indices of rows within each group.

nikhilshekhar commented 6 years ago

@oleksiyskononenko I would like to point out a couple of things:

Just to put things in perspective, to visualize one dataset, there can be more than a few hundred/thousand calls to the aggregator depending on the number of columns in the dataset. And the latency from the first aggregator call to the response being picked up by the visual server for all the subsequent aggregation calls is expected to be in seconds.

Rest all looks good, but we need to finalize on the interactions between the visual server (which is a JAVA library) and the aggregator. @oleksiyskononenko @lelandwilkinson @mmalohlava

The initial chain of call that me and @mmalohlava discussed is as below: Visual server (Java library) invokes a System call and makes a call to a python script which in turn calls the Aggregator in datatable. The aggregator, then returns back a response which has the aggregated rows. @oleksiyskononenko @mmalohlava Any thoughts/ideas on how this aggregated rows will be returned back to the visual server ( Java process), would be very helpful.

@lo5 @mmalohlava this is the new proposed architecture of the visual server/aggregator interaction. Kindly, have a look before we start development on this.

st-pasha commented 6 years ago

We already have a channel of communication between the java VizServer and the python DAI. Why can't we piggy-back on that? In terms of retrieving data on Java's end, the data has to get there from C/Python somehow, and I don't think there's faster way than through the NFF Jay file.

As for doing multiple aggregations at once -- it is certainly doable, but will require a separate method. Such method could get a list of frames to aggregate, and produce again a list of aggregated/reduced frames. Under the hood, all those aggregations would be carried out in parallel via OMP.

nikhilshekhar commented 6 years ago

@st-pasha The Visual Server as of today communicates to the python DAI only via JSON RPC request/response. There is no actual transfer of data that happens there. And if we want to rely on Visual Server to read data from the Jay file, it would need a Java Parser for reading the Jay file and that needs to be written/maintained. @mmalohlava would like to know what you think would be the optimal way of doing this.

oleksiyskononenko commented 6 years ago

@nikhilshekhar Yes, we can add a convenience function to return you the aggregated rows only. As @st-pasha said (I've updated my original comment according to his suggestion), we can keep the data internally as the grouped frame, so that we can also return the members or any other relevant information, if required.

As for the way of communication between the visual server and datatable, it is probably an orthogonal issue. May be @arnocandel may have some insights.

mmalohlava commented 6 years ago

My Comments:

The problem:

I would prefer (B), but (A) is easier in short term.

WDYT @oleksiyskononenko @st-pasha ?

st-pasha commented 6 years ago

@mmalohlava If I understand you correctly, you are suggesting the following mechanism:

lo5 commented 6 years ago

The call sequence is Browser -> DAI -> Procsy -> VisServer.

DAI (python) already knows what it wants, hence can -

  1. get aggregated data from pydatatable (py-py function call).
  2. send it to visdataserver via Procsy to get the vis data.

I don't see a strong reason to have Java read anything from disk.

lo5 commented 6 years ago

From talking to @nikhilshekhar, it turns out that there is a need to aggregate all columns in a dataset for autoviz, so (2) above would be via a file possibly.

@nikhilshekhar will check with @lelandwilkinson if it's possible to eagerly aggregate all columns and hand vis server this data, as opposed to vis-server initiating calls to pydatatable (convoluted).

lelandwilkinson commented 6 years ago

Thanks for all these comments and deep thinking. A couple of points:

vis-data-server is not allowed to see any data object except an aggregated Table. Please take a look at my latest code, where an aggregation happens when you see this line:

aggregatedTable = dataSource.getAggregatedTable(continuousVariableIndices);

For all purposes, vis-data-server has no idea (and doesn't care) whether the data were aggregated or not. Take a look at where this happens in DatatableDataSource. I don't have options like number of bins, radius, minimum number of rows to aggregate, maximum number of dimensions, seed because I don't need them and they add unnecessary complications to the algorithm. These are things we should consider for V2. Adding them at this point will only complicate the development process. The whole design of Aggregator, if you look at the Java code, is to require only one pass through the data, unlike the one Arno and I did for H2O.

I agree with Nikhil that we ought to consider parallelization inside Datatable if it will help with performance. It's not a difficult algorithm to parallelize.

A fundamental component of the contract is that vis-data-server ought to be able to access a data table as fast as other parts of DAI do. This implies that the aggregator in datatable has to be super fast and, equally important, vis-data-server cannot read or even know about NFF files. But aggregated Tables are tiny, so if you decide it is better to provide a storage mechanism for them, they shouldn't take a lot of resources. That would mean that datatable has some sort of buffering mechanism to handle repeated queries for aggregated Tables. Again, however, it is not the responsibility of vis-data-server to decide how it accesses an aggregated Table. There is only one way it can see data (see the call above).

Take a look at the Table class. All the expensive functions are inside there:

getMemberIndices(), getCategories(), getRow(), getDataLimits(), getNumValues()

We don't want any of these inside datatable. This is because these functions are not expensive when applied to small datasets that are typical for aggregations. Recall that the entire design of vis-data-server is based on computing weighted statistics, where the weights are in the last column ("counts").

Keep in mind also that vis-data-server is row oriented. I know column-oriented is all the rage today, but for statistical calculations (Mahalanobis distance, etc.), row-oriented is more efficient when applied to files containing a small number of rows. Keep in mind that rows contain elements which are Java Objects. These Objects can be dates, numbers, strings, etc. One needn't know their types in order to process them.

A small point: The statement "Whenever a VisServer needs to aggregate a data frame" should be changed to "Whenever vis-data-server needs to see data." Again, vis-data-server knows nothing about aggregation. It thinks all datasets have a "counts" column or otherwise, all counts are 1. The Aggregator (inside datatable) will decide when a file has too few rows to merit aggregation.

The aggregation algorithm is not a cluster analysis. So we should avoid terminology like "observation clusters" and instead use the terminology in the paper, namely, "exemplars" (aggregated rows) and "members" (lists of row indices for each exemplar).

The statement, "If multiple column names are passed in options, ideally the aggregation should be done over each of the columns in parallel and the result should be returned back to the visual server. For example, if I pass in the file on which aggregation needs to be done along with 100 column names in the file, the aggregator should be able to compute the 100 aggregations in parallel and return the respective aggregated frame." is not quite true. You can't parallelize across columns because the ND aggregation is row-wise and the Euclidean distance calculation has to be computed across columns. You CAN parallelize across rows and that would gain us some traction with deep files.

The statement, "We already have a channel of communication between the java VizServer and the python DAI. Why can't we piggy-back on that? In terms of retrieving data on Java's end, the data has to get there from C/Python somehow, and I don't think there's faster way than through the NFF Jay file" is not quite true. Take a look at NFFDataSource, which Nikhil designed for improving performance in V2. We thought it would be faster, but it wasn't. I don't think you want to store NFF files and parse them every time vis-data-server wants to see a Table. Whatever DAI does to get data should be the mechanism for vis-data-server getting the same data, except vis-data-server gets an aggregated Table rather than whole column(s). If DAI deals with memory data objects, then vis-data-server has to do the same. I liked the comment, "I don't see a strong reason to have Java read anything from disk."

We've been talking Python throughout, and that's fine. But keep in mind that the primary visualization client is JavaScript, not Python. If we're obliged to make everything accessible to Python (not sure why), then let's be sure we pay no performance penalty for doing that.

Again, thanks for the thoughts. It would appear that our main task is figuring out how to talk to Java. Even if an object is in memory, we have to get Java DataSource to see it as a Java Table. I suggested JNI, but this can be inefficient. Maybe one of you knows some magic that is even faster for letting C++ in datatable hand over a Table.

nikhilshekhar commented 6 years ago

@lo5 circled around with @lelandwilkinson and the code - the eager aggregations is not possible for all visualizations. For example - in the Visual server, the first call is always get_autoviz() , so the aggregations needed by this method can be pre-computed. No issues here in pre-computing aggressively. But the subsequent calls to visualization methods like parallel coordinate plots, barcharts etc needs to perform aggregations based on the return values from the get_autoviz(). There is no way of computing this aggregation beforehand. The aggregation over a subset of different columns results in different aggregated rows.

nikhilshekhar commented 6 years ago

Trying to comprehend, what would a good chain of calls after the discussion on the thread and in sync with @lelandwilkinson @lo5 @mmalohlava . Kindly have a look at the below and point out changes/fixes or suggest alternative for the same.

Browser makes request for visualization --> python DAI --> procsy --> Visual Server Calls the corresponding methods. Each of these method would need some aggregations to be computed --> Call goes back via procsy to --> python DAI --> calls datatable aggregator which writes down csv files ( 1 for each aggregation request) --> the response with the aggregated file path is returned via procsy --> visual server now reads the aggregated csv files and computes values needed for visualization --> returns back computed values via procsy --> python DAI --> UI renders it and plots it

The above chain of call will be called multiple times before all the visualization can be rendered and shown on the UI.

lelandwilkinson commented 6 years ago

Been thinking this through a bit more, especially after talking with @nikhilshekhar. My CSV parser inside vis-data-server is almost as fast as datatable and produces a Table. So it might be cleaner for datatable to aggregate and output a CSV file that can be read (from disk or memory) by vis-data-server. Because the aggregated file is so small, this should be fairly quick. The credit_card.CSV file in my tests is 2.9 MB and is read in less than a second. This file is considerably larger than a typical aggregated file. This approach would be the cleanest. Whether it is the fastest requires further testing.

lelandwilkinson commented 6 years ago

If datatable outputs an aggregated CSV file, it would also be more useful to those Python users who are not using it with DAI (because it's open source).

oleksiyskononenko commented 6 years ago

@lo5 The Aggregator will aggregate all the columns for the supplied dataframe.

@lelandwilkinson The options parameter will be optional (added this explicitely to the original comment). If not passed to the Aggregator it will use the default values from https://github.com/h2oai/vis-data-server/blob/master/library/src/main/java/com/h2o/Globals.java

The Aggregator will output an aggregated/grouped dataframe that can be easily saved as CSV according to @st-pasha, however, I leave it to you guys to decide on what will be the best way to communicate to vis-data-server.

arnocandel commented 6 years ago

My comments:

  1. Writing a .CSV of a thousand rows is definitely fast enough, nothing we do here is disk I/O or memory bandwidth limited (those are both > GB/s), it's always the logic (sequential blocking calls to serial algorithms etc.) that's the reason for why AutoVis on a 100MB dataset would take more than 100 ms.
  2. @lelandwilkinson H2O-3 aggregator is only multi-pass to get the number of exemplars "just right", otherwise it's the same single-pass algo as in Lee's original Java implementation. The only differences are: a) parallelism and repeated aggregation of epsilon-bubbles leading to slightly larger epsilon-bubbles and b) different categorical handling (poorer in h2o-3)
  3. @lelandwilkinson When Nikhil talked about 100 aggregations over columns in parallel, I believe he meant 100 calls to the Aggregator for different X/Y scatterplots etc., not parallelization over columns for a single aggregation. So this is one of the reasons that AutoVis1 (H2O-3 aggregation) was faster than AutoVis2 since all those 100 calls were submitted at once and executed simultaneously by H2O-3 vs one at a time in AutoVis2.
mmalohlava commented 6 years ago

@nikhilshekhar why you need a call from VisDataServer to "Call goes back via procsy to --> python DAI --> calls datatable "? VisDataServer can call datatable (via a exec a Python wrapper) directly, no?

@lelandwilkinson CSV is an option, but to avoid re-parsing, still NFF/Jay seems more suitable

@lelandwilkinson I meant parallelization over multiple invocation of aggregator (as @arnocandel describes above) - the motivation is that Datatable is accessing "big data" so can do better job on parallelizing computation then we can do in VisDataServer.

st-pasha commented 6 years ago

I think the problem still remains of figuring out which exactly aggregations to perform, and who is going to control them. If a user loads an n-column input Frame into DAI, the following tasks can possibly be carried out:

Overall, that's n^2 + n + 1 output frames. However, considering that DAI supports up to 10000 columns, this approach is not tenable. Someone must have the responsibility to decide which exactly aggregations to carry out.

nikhilshekhar commented 6 years ago

@mmalohlava yes Visual Server can definitely call datatable via an exec Python wrapper. It will save a few method calls. But, to preserve the server-client architecture, I thought it would be best if every call is routed via the server. But, if calling datatable directly from the visual server is a better way, surely we can do that.

arnocandel commented 6 years ago

It probably always does a full aggregation and then based on calculations on that, asks for up to ~100 more (hopefully at once).

lelandwilkinson commented 6 years ago

A few things: 1) Please let's use the tag "vis-data-server" in our paths. The word "autovis" is a descriptor for only one small part of the visualization server. Our task is not to serve autovis; it's to serve vis-data-server. Every class in there (BarCharts, BoxPlots, LoessRegressions, etc.) needs to look at the aggregated CSV file produced by datatable. 2) @st-pasha , datatable needs to produce aggregated files on demand. It can't possibly produce all single column or pairwise column aggregations a priori, for obvious reasons you stated. Instead, datatable needs to produce them when vis-data-server requests a particular aggregation. If you want to precalculate some of these, the problem gets much more complex because only vis-data-server knows which ones will be needed. For Version 1, I'd suggest we stick with aggregation on demand. At the moment, DatatableDataSource points to a datatable NFF file and an aggregation is requested by a list of column names in that file over which the aggregation is to be performed. Getting that list to datatable itself may require some sort of serialization of that list of strings or use of Python. Not my area of expertise. 3) @arnocandel , thanks for your observations on the H2O aggregator. You're correct regarding the superior performance of the H2O aggregator on large files because of the use of MapReduce. When I spoke of multiple passes, that was only in reference to the option of specifying in advance the size of the aggregated file. We no longer need to do this. It would be nice if we could use a distributed algorithm inside datatable the way you did for H2O and, especially, to leverage GPUs. For a baseline, though, I'd recommend starting with the simple single-threaded algorithm I gave @oleksiyskononenko and then going to distributed for V2. That's only a recommendation, however. 3) I strongly believe that we should produce CSV files for the aggregated exemplars and corresponding aggregated members list. I've already checked in a super-fast importer for these files (which should have fewer than 500 rows). Following this architecture will give us more portability for the R and Python interfaces. 4) The aggregated exemplar file looks like this: Separator: comma (,) fields: every field is a String embedded quotes in a field are represented by "" (double quotes) If a field has no commas or embedded quotes, it need not be surrounded by quotes Header records: record1: Column names (separated by commas) record2: Column types (Strings): "integer", "real", "string", "date" (can add more types later) record3 and subsequent records: data values represented as Strings Missing value for a data field: two adjacent commas, whether String or numeric field Do not use NULL, null, NaN, or other schemes for representing missing values. I could parse them (plus a bunch of other symbols like "*", "?", "." used by stat packages, but that would slow things down. The vis-data-server CSV parser will pad short data rows (rows with fewer fields than record1) with missing values and truncate long rows. Numeric data may be formatted as fixed decimal or scientific notation. Dates are formatted as MM/DD/YYYY. The vis-data-server will parse them into a Java Calendar/Date and output results according to locale. 5) The aggregated members file looks like this: Comma separator for each row in the exemplar file, a list of long integers Rows in the exemplar file are one-to-one with rows in the members file. The members file has rows of different lengths (1 or more elements). No member row can be empty (by definition of the aggregation algorithm).

lelandwilkinson commented 6 years ago

One more thing: The datatable aggregator will add an extra column at the end, with a "counts" label. The entries in this column will be positive long integers represented in Strings. The "counts" label will be added to the list in record1 and a "real" type will be added to the end of record2.