Open notfilippo opened 4 months ago
Thank you @notfilippo -- I think this proposal is well thought out and makes a lot of sense to me.
If we were to implement it I think the benefits for DataFusion would be enormous
From my perspective, the use of Arrow types in logical planning in DataFusion (e.g. type coercion) has always been a little bit of an impedance mismatch. When there were just a few variants (e.g. String
/LargeString
and Dictionary
) it was annoying but manageable.
As Arrow evolves (e.g. to include REEArray
, StringViewArray, etc) the mismatch is becoming more painful (e.g. https://github.com/apache/datafusion/issues/10220 is an example)
Care must be put in place not to introduce breaking changes for downstream crates and dependencies that build on top of DataFusion.
I think breaking changes to the API is inevitable, but I think we can mange the pain through careful API thought and deprecation. More thoughts to follow
Thoughts on the technical details
This would also mean the introduction of new Field-compatible structure (LogicalPhysicalField)
Since LogicalPlans
already use DFSchema, maybe we could avoid introducing a LogialSchema
at all and instead update DFSchema
to know how to report LogicalType
🤔
Open question: Should ScalarValue split into a LogicalScalarValue and a PhysicalScalarValue? Or should it just provide generic information during logical planning (e.g. its logical type, is_null(), is_zero()) without access the underlying physical representation?
Another possibility wuld be to migate ScalarValue
so it was entirely logical (the usecase for a Physical scalar value is not large). We could slowly remove the non-logical versions of ScalarValue (e.g. remove ScalarValue::LargeString
and ScalarValue::Dictionary
) -- this would give us some idea of where the boundary would be as well as I think improve the usability (we have for example, hit issues where ScalarValue::StringView
were treated differently than ScalarValue::Utf8
)
I think the biggest challenge of this project will be managing the transition from Arrow DataType to LogicalTypes. You have done a great job articulating the places that would need to be changed. I think we could manage the transition over time by for example deprecating (but leaving UDF APIs in terms of DataType)
ColumnarValue enum could be extended so that functions could choose to provide their own optimised implementation for a subset of physical types and then fall back to a generic implementation that materialises the argument to known physical type.
ANother possibility would be to make a function like ColumnarValue::into_one_of
that ensured (via casting if necessary) the input was one of the types supported by the operator. For example
let input: ColumnarValue = &args[0];
// get input as one of the named types, casting if necessary
let input = input.into_one_of(&[DataType::Utf8View, DataType::Utf8])?;
match input.data_type() {
DataType::Utf8View => { /*specialized impl for StringViewArray */ },
DataType::Utf8 => { /*specialized impl for StringArray */ },
_ => unreachable!()
}
RecordBatches with same logical type but different physical types
Another thing we could do is relax the requirement that the DataType
s matched exactly between physical plans, and instead just require that the LogicalTypes matches (in other words an ExecutionPlan could report it generates output like Utf8
but could actually produce output like Utf8View
🤔
Since LogicalPlans already use DFSchema, maybe we could avoid introducing a LogialSchema at all and instead update DFSchema to know how to report LogicalType 🤔
Initially, I planned to propose repurposing the DFSchema for this change. Still, I decided against it (at least for the first draft) because of this open question that I've noted above:
Open question: Should we really introduce a new schema type or should we reuse DFSchema? The qualifiers for fields in DFSchema should not be specified by the Tables themselves.
This issue originates from the fact that TableSource and TableProvider (the "native" sources of schemas) would have to return a DFSchema to include the LogicalPhysicalType
s. Adopting DFSchema at that level would mean that both TableSource and TableProvider could edit the qualifiers of the DFSchema, which doesn't seem right (even if the qualifiers would then be replaced or ignored).
This proposal makes sense to me. Thanks for driving this @notfilippo.
This issue originates from the fact that TableSource and TableProvider (the "native" sources of schemas) would have to return a DFSchema to include the LogicalPhysicalTypes
I was thining that a DFSchema
could still wrap an arrow Schema
but translate to LogicalType
So like if a TableProvider said it returned a StringView
or a Dictionary(Int32, String)
the DFSchema would report LogicalType::Utf8
or something
I haven't looked at the code so there may be some reason this wouldn't work
The main challenge I see is that this will be a very large project. The high-level premise of separating logical and physical types makes sense from a first-principle POV, but the cost/benefit analysis at this point is not very clear to me. The latter will probably depend on the implementation strategy and the exact architecture we adopt, so I think we should try out different ideas and approaches before committing to a strategy/architecture and accepting/rejecting/deferring the idea based on a possibly premature choice.
Another possibility wuld be to migate ScalarValue so it was entirely logical (the usecase for a Physical scalar value is not large).
AFAICT ScalarValue
is used quite heavily in physical stuff like accumulators and other places. Many "state"-like information we track during the course of execution relies on ScalarValue
. Maybe I'm misunderstanding what you meant here?
AFAICT ScalarValue is used quite heavily in physical stuff like accumulators and other places. Many "state"-like information we track during the course of execution relies on ScalarValue. Maybe I'm misunderstanding what you meant here?
I was thinking that there is no fundamental difference between using ScalarValue::String
and ScalarValue::LargeString
and ScalarValue::Dictionary(..)
to accumulate string values as they all eventually wrap String
Thus I was thinking we could simplify the physical implementations by not having different codepaths and this would also give us some first hand experience in how mapping Logical --> Physical types might look like
This generally looks good. I agree that starting with making ScalarValue
to represent a logical type would be a good starting point.
One small nit: I don't think I would lump together FixedSizeBinary
with Binary
and FixedSizeList
with List
. The fixed lengths often have semantics that should be considered. For example, we use FixedSizeList<f32>
as a vector, where the length list is the vector dimension.
Is it possible to have the mapping of the relation between arrow's DataType and LogicalType
without TypeRelation
?
As long as there is only one LogicalType
for each arrow::DataType
, we can build up the mapping via From/Into
conversion.
I was thining that a
DFSchema
could still wrap an arrowSchema
but translate toLogicalType
We can then easily get the LogicalType
from DataType
inside Schema
.
Is there any type mapping that can't be done without TypeRelation
🤔 ? In other words, is there any arrow::DataType
that
can be mapped to more than one LogicalType
?
Is there any type mapping that can't be done without
TypeRelation
🤔 ? In other words, is there anyarrow::DataType
that can be mapped to more than oneLogicalType
?
I think DataType::Dictionary(..)
would map to more than one LogicalType (it would map to whatever the dictionary values type was)
Thus I was thinking we could simplify the physical implementations by not having different codepaths and this would also give us some first hand experience in how mapping Logical --> Physical types might look like
This could be interesting to try -- both in terms of whether we can somehow simplify ScalarValue
and also accumulate learnings to help towards this proposal.
It would also be a relatively contained draft, and if the result is satisfactory, can possibly get merged in even if the full proposal is for some reason not yet doable in our current state
It would also be a relatively contained draft, and if the result is satisfactory, can possibly get merged in even if the full proposal is for some reason not yet doable in our current state.
I agree with this approach. I'll dive deeper next week and report back my findings.
Is there any type mapping that can't be done without
TypeRelation
🤔 ? In other words, is there anyarrow::DataType
that can be mapped to more than oneLogicalType
?I think
DataType::Dictionary(..)
would map to more than one LogicalType (it would map to whatever the dictionary values type was)
Why don't we have LogicalType::Dict(LogicalPhysicalFieldRef)
, similar to List, Struct, and Map. Although Dict
is not really a nested type like those 3, but with LogicalType::Dict(LogicalPhysicalFieldRef)
we could probably avoid replacing DFSchema
with LogicalPhysicalSchema
or something like
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum LogicalType {
LogicalPrimitiveType,
Date,
Time32(TimeUnit),
Time64(TimeUnit),
Timestamp(TimeUnit, Option<Arc<str>>),
Duration(TimeUnit),
Interval(IntervalUnit),
List(LogicalPhysicalFieldRef),
Struct(LogicalPhysicalFields),
Dict(LogicalPrimitiveType),
Map(LogicalPhysicalFieldRef, bool),
Decimal128(u8, i8),
Decimal256(u8, i8),
Union(LogicalUnionFields, UnionMode), // TODO: extension signatures?
// UserDefinedType
}
pub enum LogicalPrimitiveType {
Null,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Boolean,
Float16,
Float32,
Float64,
Utf8,
Binary,
}
we could probably avoid replacing DFSchema with LogicalPhysicalSchema
That is an interesting idea. I don't fully understand the implications, but if we can figure out how to avoid having to switching from DFSchema
I suspect it would make this project less impactful on downstream crates
I like the idea of separating logical types from arrow types, but it would be great to understand the exact consequences. DataFusion is both a SQL execution engine (so it has a SQL frontend) and also a query execution library (so it has rich programmable API).
The SQL frontend should have "very logical types". For example, we don't need Time32(unit)
and Time64(unit)
. The SQL user isn't interested in how different precisions are implemented. Having Time(unit)
should be enough (and different units could use different physical representations under the hood).
Also, Timestamp(unit, tz)
may want to be revisited from SQL perspective. SQL spec defines two separate types "timestamp" (without zone) and "timestamp with time zone" (point in time semantics + zone information), It might be possible (with some limitations) to have both use same physical representation (like arrow Timestamp(unit, tz)
), but logically they want to be distinct.
Then, from DF-as-a-library perspective, physical representation becomes important.
To drive the design for this part, we need to understand how DF-as-a-library is used. What are its necessary contractual obligation and what can be implementation detail. At this level we need to be even more extensible, since adding more types into the type system feels very natural for DF-as-a-library use-case. We also may need to be more physical, like perhaps discerning between time32 and time64.
It would be great if, as proposed here, this layer still didn't have to deal with various equivalent ways of encoding semantically equivalent data. I.e. DF-as-a-library use-case still doesn't want to discern between Utf8
, Dictionary(Utf8)
RunEndEncoded(Utf8)
. This shouldn't be in the type system at all. A function operating on string values should work with any valid representation of string values (either intrinsically, or with an adapter).
ColumnarValue enum could be extended so that functions could choose to provide their own optimised implementation for a subset of physical types and then fall back to a generic implementation that materialises the argument to known physical type. This would potentially allow native functions to support user defined physical types that map to known logical types.
I am concerned about deriving support for a logical type based on the support for a physical type is actually slipper slope.
Let's consider an example. Let's assume i have my_duration({micros | nanos})
type with uses 64-bit integer physical type for representation. my_duration
values are always stored with nano precision, and the unit defines how aligned they have to be. I.e. 1s is always stored as integer value 1_000_000_000 for both precisions allowed. Every value of my_duration(micros)
is represented as i64 number divisible by 1000.
Now, i have add_one() function that can take 64-bit integer values and add +1 to them. The +1 operation is perfectly valid operation for i64 -- it's valid for sql long/bigint type. It's valid also for my_duration(nanos), but it's not valid for my_duration(micros), since it produces unaligned value (not divisible by 1000).
I find a previous discussion and reference it here: #7421
I have a question: How users specify the underlying physical types? FYI, Clickhouse exposes physical types to users like this.
I have a question: How users specify the underlying physical types? FYI, Clickhouse exposes physical types to users like this.
Physical Type in Datafusion is Arrow Type
I have a question: How users specify the underlying physical types? FYI, Clickhouse exposes physical types to users like this.
Physical Type in Datafusion is Arrow Type
Apologies for the unclear description. I meant to ask, if we opt for logicalType, how do users then specify the physical types? This is important because in certain scenarios, only the users can determine the most suitable data types.
Now, i have add_one() function that can take 64-bit integer values and add +1 to them. The +1 operation is perfectly valid operation for i64 -- it's valid for sql long/bigint type. It's valid also for my_duration(nanos), but it's not valid for my_duration(micros), since it produces unaligned value (not divisible by 1000).
@findepi -- If I understand correctly your example is about a the logical type my_duration({micro | nano})
with the underlying physical type int64
. In this case the type my_duration(micro)
is logically different from my_duration(nano)
(both logically different from an int64
).
Functions that have specialised implementations for my_duration
, specified in their signature, would have to handle either case differently. Instead, functions such as add_one(int64)
would have a signature accepting only logical int64
s and not my_duration
.
Hypothetically, if add_one
was to be edited to support my_duration
(by accepting any numeric type) it would either have to provide a specific implementation for the my_duration
logical type or it would've to materialise the array to a known numeric type like int64
. The materialisation wouldn't necessarily match the underlying physical type and for this example I think it would make sense to materialise as an int64
array with the number of nanoseconds (physical type) multiplied by the unit of the duration.
if we opt for logicalType, how do users then specify the physical types?
@doki23 -- Through the use of a custom implementation of the TypeRelation
trait, which then can be used inside a LogicalPhysicalSchema
. All type sources (see proposal) would need to be edited to support returning this new type of schema which will then be embedded into a DFSchema and used to carry both the physical and the logical type knowledge throughout logical and physical planning.
One small nit: I don't think I would lump together FixedSizeBinary with Binary and FixedSizeList with List. The fixed lengths often have semantics that should be considered. For example, we use FixedSizeList
as a vector, where the length list is the vector dimension.
@wjones127 -- Noted! I was also hesitant on including the Fixed* variants into the base ones and your explanation makes sense to me. While I agree that having fixed length constraint for a list of logical types makes sense I am not convinced about FixedSizeBinaries. What would be the use case? Do you have some example in mind?
While I agree that having fixed length constraint for a list of logical types makes sense I am not convinced about FixedSizeBinaries. What would be the use case? Do you have some example in mind?
Example uses of fixed size binary are representing arrays of data types not supported in Arrow, such as bf16 or i128 (UUID). A value with a different number of bytes would be considered invalid.
Example uses of fixed size binary are representing arrays of data types not supported in Arrow, such as f16 or i128 (UUID). A value with a different number of bytes would be considered invalid.
Makes sense! Thanks for explaining – I'll edit the proposal.
Through the use of a custom implementation of the TypeRelation trait, which then can be used inside a LogicalPhysicalSchema.
@notfilippo Are you suggesting that to utilize the StringView
type, users are required to create a custom logical type named MyString
that establishes a type relationship between StringView
and Utf8
? It seems somewhat complicated, or perhaps I'm misinterpreting your explanation?
FYI, in SQL DDL for ClickHouse, you can specify the underlying physical type like this:
CREATE TABLE lc_t
(
`id` UInt16,
`strings` LowCardinality(String)
)
Using LogicalType in the SQL layer does not necessarily mean that users cannot declare the underlying datatype. It is possible to provide a special SQL syntax for this purpose.
@notfilippo Do you think it is possible to get LogicalType based on arrow::DataType?
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DFSchema {
/// Inner Arrow schema reference.
inner: SchemaRef,
/// Optional qualifiers for each column in this schema. In the same order as
/// the `self.inner.fields()`
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
}
We have Arrow Schema in DFSchema, if we could get all the LogicalType based on carried arrow type, the breaking change could be really small.
pub enum LogicalType { // Datafusion's primitive type
Null,
Int8,
Int16,
Int32,
Int64,
UInt8,
UInt16,
UInt32,
UInt64,
Boolean,
Float16,
Float32,
Float64,
Utf8,
Binary,
Date,
Time,
List(Box<LogicalType>),
FixedSizeList(Box<LogicalType>, usize),
.... and more
// No Arrow DataType here. Ideally only basic type that is not possible to be constructed from any other
// Any other user defined type could be built on top of df's primitive type
}
impl From<DataType> for LogicalType {
fn from() {
DataType::Utf8View => LogicalType::Utf8,
DataType::Utf8 => LogicalType::Utf8,
}
}
This is the rough idea, given you have already playing around with the code, maybe you know whether this makes sense
@jayzhan211 & @doki23 -- I'll try to answer both with a single comment: the proposal includes (you can see it where LogicalPhysicalType is defined) an implementation of the From<DataType>
trait for LogicalType
. This implies that if we introduce a LogicalPhysicalSchema type, containing LogicalPhysicalFields a possible way to convert between types would be:
let schema = Schema::new(/* ... */);
let logical_physical_schema: LogicalPhysicalSchema = schema.into();
// Suppose that DFSchema is edited to support LogicalPhysicalSchemaRef instead of SchemaRef
let df_schema = DFSchema::try_from_qualified_schema("t1", &logical_physical_schema).unwrap();
The into()
call will convert all known arrow::DataTypes in the schema to a NativeType
which will keep a reference to the original DataType and its logical representation.
let string_view = DataType::StringView;
let logical_string_view: LogicalPhysicalType = string_view.into();
// logical_string_view will be:
// LogicalPhysicalType(Arc(NativeType { logical: Utf8, physical: StringView }))
For user defined types instead of converting an arrow::Schema into a LogicalPhysicalSchema you would directly define a LogicalPhysicalSchema where you can use your own arrow-compatible LogicalPhysicalTypes.
// logical_string_view will be: // LogicalPhysicalType(Arc(NativeType { logical: Utf8, physical: StringView }))
Instead of LogicalPhysicalType
+ LogicalPhysicalSchema
to keep track of the relationship between the logical and physical type, could we lookup DFSchema
for the actual physical type so that we just need to build the unidirectional mapping of logical type from the physical type through into()
called.
For example, StringView
is the actual decoding we used and we could find it in DFSchema
, we can also get the logical type (Utf8) based on the into()
called.
Instead of LogicalPhysicalType + LogicalPhysicalSchema to keep track of the relationship between the logical and physical type, could we get the unidirectional mapping of logical type from physical type?
Oh now I understand. It makes sense to do it this way but it would lose the ability of potentially adding new arrow-compatible physical encodings (e.g. logical Utf8
with physical List(UInt8)
)
Instead of LogicalPhysicalType + LogicalPhysicalSchema to keep track of the relationship between the logical and physical type, could we get the unidirectional mapping of logical type from physical type?
Oh now I understand. It makes sense to do it this way but it would lose the ability of potentially adding new arrow-compatible physical encodings (e.g. logical
Utf8
with physicalList(UInt8)
)
Great, a step closer to it.
I think we could consider this kind of case as user defined type
where there is a UserDefinedType that built on top of our datafusion's primitive type, in this case Utf8
. But we actually have List(Uint8)
inside arrow::Schema, so we can decode the type with List(UInt8). Therefore, we have UserDefinedType(Utf8)
in the logical planning step, and List(Uint8)
in the physical planning step
The most importance thing is that we could keep DFSchema
mostly the same as it is now. But we introduce a bunch of new LogicalType and Trait for UserDefinedType to built around it.
But we actually have List(Uint8) inside arrow::Schema, so we can decode the type with List(UInt8). Therefore, we have UserDefinedType(Utf8) in the logical planning step, and List(Uint8) in the physical planning step
Makes sense and I like this approach. The only thing I'm still not understanding is how a type source (like a TableProvider) would communicate to the logical layer that the List(UInt8)
is actually a logical Utf8
.
The most importance thing is that we could keep DFSchema mostly the same as it is now. But we introduce a bunch of new LogicalType and Trait for UserDefinedType to built around it.
This would be tremendously beneficial to this proposal :)
But we actually have List(Uint8) inside arrow::Schema, so we can decode the type with List(UInt8). Therefore, we have UserDefinedType(Utf8) in the logical planning step, and List(Uint8) in the physical planning step
Makes sense and I like this approach. The only thing I'm still not understanding is how a type source (like a TableProvider) would communicate to the logical layer that the
List(UInt8)
is actually a logicalUtf8
.The most importance thing is that we could keep DFSchema mostly the same as it is now. But we introduce a bunch of new LogicalType and Trait for UserDefinedType to built around it.
This would be tremendously beneficial to this proposal :)
I think we could register the type mapping to the session state like what we register function. We define the relation from Arrow DataType to LogicalType only. We then could lookup the type relation we have and figure out that List(UInt8)
's logical type is Utf8
.
This past week I've started some work on transitioning Scalar Types to being strictly logical. Most of the refactoring work is done and now I'm reworking some of the casting logic to make execution work again. Will report back soon!
Sorry late to the party. Very nice and well thought out proposal. But I'm a little confused about the role of LogicalPhysicalType
. Doesn't this still pull the physical type into the logical plan? The way I understand the broader goal here is to do logical planning/optimization only in terms of logical types which erase differences between different physical representations of the same data (eg Dictionary/REE, String/StringView, etc). Then the handling of physical representations gets pushed down into the physical operators through runtime dispatch (or specialization during physical planning/optimization).
I'm 👍 👍 👍 for this.
I'd loke to use a logic time (backed by variable length binary) to encode JSON-like data in a more efficient format, but make sure it's still displayed as JSON or some other human-readable format.
👋 I opened a draft PR ^ to make ScalarValue logical. I have a bunch of open questions that I would be very happy to get feedback on.
Keeping track of the physical type
While logical types simplify the list of possible types that can be handled during logical planning, the relation to their underlying physical representation needs to be accounted for when transforming the logical plan into nested ExecutionPlan and PhysicalExpr which will dictate how will the query execute.
Sorry if this is a dumb question. Do we actually need to track physical representation at planning time?
Let look at an example. For example Utf8, LargeUtf8, Dictionary of Utf8, Utf8View, REE/RLE of Utf8 -- these are different physical representations that correspond to some string logical type (or varchar / varchar(n); doesn't matter). Knowing the data will be dictionary-encoded at (physical) planning time is useful (knowing more is always useful), but actually prevents run-time adaptiveness. One parquet file can have data flat and another dict-encoded, the Parquet writer can be adaptive, so at planning time there is no way to know physical representation "for sure". There is only way to "propose a physical representation", but then data needs to be made fit into that.
At what cost and for what benefit? Cost is of two kinds
arrow_cast( literal, dict_type )
? What does it mean to coerce a literal to REE/RLE? This is what brings requirement for ScalarValue
to carry any physical type, even though for a constant scalar value this is not meaninfulBenefit?
Let me draw comparison, if I may... Trino internal design is IMO very well thought thru. Trino has same concepts of physical representations (flat, dict, rle), but they don't percolate into the type system. The type system is "logical types" (or just "types"), but more importantly the functions are authored in terms of same types. The function/expression implementor doesn't need to bother with dict or rle inputs, because they are taken care once and for all by the projection/filter operator. What do they miss by not dealing with "physical types" at planning time?
Summing up, I propose that
cc @notfilippo @alamb @comphead @andygrove @jayzhan211
Do we actually need to track physical representation at planning time?
Logical operators during logical planning should unquestionably not have access to the physical type information, which should exclusively be reserved to the physical planning and/or physical execution phase.
Currently some limitations in datafusion's abstraction design don't allow a clear-cut distinction between the types (this I think is clear when you look on how you can call LogicalPlan::schema
). Knowing this, some care needs to be taken in order to slowly introduce the distinction, which mainly comes in the form of storing the DataType
alongside logical values.
The objective would be to eventually remove that knowledge and making it available directly through the data source (i.e. the RecordBatch) to support the run-time adaptiveness you are mentioning above and that've also mention in the proposal:
RecordBatches with same logical type but different physical types
Integrating
LogicalPhysicalSchema
into DataFusion's RecordBatches, streamed from one ExecutionPlan to the other, could be an interesting approach to support the possibility of two record batches having logically equivalent schemas with different underlying physical types. This could be useful in situations where data, stored in multiple pages mapping 1:1 with RecordBatches, is encoded with different strategies based on the density of the rows and the cardinality of the values.
distinction between the types (this I think is clear when you look on how you can call
LogicalPlan::schema
). Knowing this, some care needs to be taken in order to slowly introduce the distinction, which mainly comes in the form of storing theDataType
alongside logical values.
Well, if LogicalPlan is expressed in terms of logical types (what I'd like to be the datafusion types), then LogicalPlan::schema should return schema expressed in those types as well. Would be no need to track (arrow's) DataType
.
The objective would be to eventually remove that knowledge and making it available directly through the data source (i.e. the RecordBatch) to support the run-time adaptiveness you are mentioning above and that've also mention in the proposal:
Awesome. Let's graduate it from "proposal" to "project plan".
Well, if LogicalPlan is expressed in terms of logical types (what I'd like to be the datafusion types), then LogicalPlan::schema should return schema expressed in those types as well. Would be no need to track (arrow's) DataType.
Yes, that's the plan! 👍
Awesome. Let's graduate it from "proposal" to "project plan".
I really like @\alamb's idea of creating an epic and in https://github.com/apache/datafusion/pull/12536#issuecomment-2370648121 I've suggested some starting issues.
This sounds good to me.
We are running into the RecordBatches with same logical type but different physical types
issue in DataFusion Comet. For a single table, a column may be dictionary-encoded in some Parquet files, and not in others, so we are forced to cast them all to the same type, which introduces unnecessary dictionary encoding (or decoding) overhead.
@andygrove yeah, that's exactly the problem I anticipated based on DF design and Trino experience. Happy that you have real world validation that this is a problem for DF Comet too, thanks!
I don't get it, how do you get the actual value if there is no physical type (DataType)?
Given ScalarValue::Utf8(String)
, how do you convert this to StringArray or StringViewArray given no hint about what they are exactly?
In my mind, at some point we need to build the arrow's Array. How do we build it if we don't know what type is it?
The idea if dynamically switching to different physical encodings is neat, but I think it presumes all operators / functions can handle any of the different encodings (RLE, Dict, String, etc) which is not the case today
Benefit?
I think another benefit of the current type system is that the implementations of functions (and operators, etc) declare what types of arrays (physical encodings) they have specializations for and then the optimizers and analyzers ensure that the types lineup and try to minimize conversions at runtime.
For example, in a query like this that computes several operations on a column
SELECT COUNT(*), substr(url, 1, 4), regexp_match(url, '%google.com%'`
FROM ...
GROUP BY url
If we change the hash group by operator to return StringViewArray
for certain grouping operations when the input it a StringArray
, and the column is used twice -- once in substr
and once in regexp_match
which don't have specialized code paths for StringViewArray
we have to be careful the array will not be cast to StringArray
twice
In my mind, at some point we need to build the arrow's Array. How do we build it if we don't know what type is it?
I think @jayzhan211 's is the key question in my mind. At some point you need to get the actual array and have code that operates on exactly that type. Figuring out at what point this conversion happens is important.
I think another benefit of the current type system is that the implementations of functions (and operators, etc) declare what types of arrays (physical encodings) they have specializations for and then the optimizers and analyzers ensure that the types lineup and try to minimize conversions at runtime.
IMO this is quite important and we should be careful to preserve this kind of optimizations in any new design. Admittedly, we don't do a great job in removing/avoiding runtime checks in various other use cases within today's architecture, but let's not lose that capability completely -- I remember some cases where this kind of specializations yielded very significant performance improvements.
I think another benefit of the current type system is that the implementations of functions (and operators, etc) declare what types of arrays (physical encodings) they have specializations for and then the optimizers and analyzers ensure that the types lineup and try to minimize conversions at runtime.
IMO this is quite important and we should be careful to preserve this kind of optimizations in any new design. Admittedly, we don't do a great job in removing/avoiding runtime checks in various other use cases within today's architecture, but let's not lose that capability completely -- I remember some cases where this kind of specializations yielded very significant performance improvements.
I'm a bit confused as to what the goal is of this work is if we still need to track the physical type during planning?
I'm a bit confused as to what the goal is of this work is if we still need to track the physical type during planning?
I would like to stress that the intent of this proposal remains to decouple logical types from physical types in order to achieve the following goal:
Logical operators during logical planning should unquestionably not have access to the physical type information, which should exclusively be reserved to the physical planning and physical execution.
LogicalPlans will use LogicalType while PhysicalPlans will use DataType.
While the goal seems to have achieved wide consensus, the path to reach it has not been finalised. Through some experiments (#11160 -> #11978 -> #12536) we've been trying to narrow down on a possible approach to commit to in order to make progress.
As this proposal aims at changing the tires on a moving car there is and there will be a lot of discussion in order to complete the migration safely and without breaking too much functionality for end user. This will certainly result in a intermediate state where the existing behaviour is supported by temporarily tracking DataType
alongside some objects which will only have a logical type until the type can be extracted by the context itself.
Re: @findepi's proposal,
Summing up, I propose that
- we introduce the concept of "data fusion type". This is the "logical type" @notfilippo proposed.
- we use this "data fusion type" for logical plans
- we use this "data fusion type" for physical plans as well
- this leaves existing "physical plans" to be a runtime concept
- we use this "data fusion type" for function authoring, scalar/constant/literal values in the plan
This proposal is compatible with (and actually depends on) the decoupling logical from physical types but I think it's a further step ahead to consider once we at least clear the initial steps to take in order to make LogicalTypes happen.
Additionally I think it should be filed as a separate, but related, ticket. I understand that it heavily depends and influences the choices of this proposal but judging by the comments above I think there needs to be a separate discussion in order to validate the idea on its own.
I think another benefit of the current type system is that the implementations of functions (and operators, etc) declare what types of arrays (physical encodings) they have specializations for and then the optimizers and analyzers ensure that the types lineup and try to minimize conversions at runtime
Not sure where we discussed this already but I would love to support both logical types and physical types when declaring function signatures in order to let the user have full control over the arguments: as little control as simply specifying a LogicalType + cast or as much control as precise function signatures for specific DataTypes.
Instead I was planning on keeping return_type
and invoke
as is, potentially adding a return_logical_type
helper if needed.
This proposal is compatible with (and actually depends on) the decoupling logical from physical types but I think it's a further step ahead to consider once we at least clear the initial steps to take in order to make LogicalTypes happen.
Additionally I think it should be filed as a separate, but related, ticket. I understand that it heavily depends and influences the choices of this proposal but judging by the comments above I think there needs to be a separate discussion in order to validate the idea on its own.
I don't mind creating new ticket if needed. I created https://github.com/apache/datafusion/issues/12644 to track Extension Types which -- although mentioned already in this issue -- feel like even a higher goal to achieve.
I am not convinced, however, that we should separate these discussions: "use new DataFusion types in logical plans" and "use new DataFusion types in physical plans as well". They feel very related and both impact the type system design. I don't want to paint ourselves into a corner just because we narrowed the horizon.
Using logical types for all of the planning (including physical planning) is very important for performance -- see @andygrove 's Comet comment https://github.com/apache/datafusion/issues/11513#issuecomment-2370933596 but obviously this is not Comet-only thing. As with any performance improvement, it may be impossible not to regress some specialized cases along the way. There is not point in breaking something for the sake of breaking. Just that if we allow only incremental improvements, we achieve local optimum only.
@findepi
The core issue seems to be mapping between Arrow's DataType and DataFusion's logical type. While we could technically bypass logical types and handle everything using Arrow's DataType, it's beneficial to have a simplified version of DataType for easier processing. This is where the logical type comes in—it serves as a simplified version of DataType to enumerate all semantically equivalent types. In this case, the logical type should be a more constrained or reduced representation than Arrow's DataType.
We want a one-way mapping from Arrow's DataType to user-defined or extension types, where the logical type (DataFusion's native type) acts as the single source of truth within DataFusion, much like native types in Rust.
To achieve this, we need two traits for type mapping:
If these types map to the same logical type, it implies that we can correctly decode the value as the expected type. Otherwise, it signals a type mismatch.
#[derive(Clone)]
pub enum LogicalType {
Int32,
String,
Float32,
Float64,
FixedSizeList(Box<LogicalType>, usize),
// and more
Extenstion(Arc<dyn ExtensionType>),
}
pub trait ExtensionType {
fn logical_type(&self) -> LogicalType;
}
pub struct JsonType {}
impl ExtensionType for JsonType {
fn logical_type(&self) -> LogicalType {
LogicalType::String
}
}
pub struct GeoType {
n_dim: usize
}
impl ExtensionType for GeoType {
fn logical_type(&self) -> LogicalType {
LogicalType::FixedSizeList(Box::new(LogicalType::Float64), self.n_dim)
}
}
pub trait PhysicalType {
fn logical_type(&self) -> LogicalType;
}
impl PhysicalType for DataType {
fn logical_type(&self) -> LogicalType {
match self {
DataType::Int32 => LogicalType::Int32,
DataType::FixedSizeList(f, n) => {
LogicalType::FixedSizeList(Box::new(f.data_type().logical_type()), *n as usize)
}
_ => todo!("")
}
}
}
I’d love to hear if this design is sound, or if there are any potential pitfalls in how I’ve approached type mapping.
Abstract
Logical types are an abstract representation of data that emphasises structure without regard for physical implementation, while physical types are tangible representations that describe how data will be stored, accessed, and retrieved.
Currently the type model in DataFusion, both in logical and physical plans, relies purely on arrow’s DataType enum. But whilst this choice makes sense its physical execution engine (DataTypes map 1:1 with the physical array representation, defining implicit rules for storage, access, and retrieval), it has flaws when dealing with its logical plans. This is due to the fact that some logically equivalent array types in the Arrow format have very different DataTypes – for example a logical string array can be represented in the Arrow array of DataType
Utf8
,Dictionary(Utf8)
,RunEndEncoded(Utf8)
, andStringView
(without mentioning the different indexes types that can be specified for dictionaries and REE arrays).This proposal evaluates possible solutions for decoupling the notion of types in DataFusion’s logical plans with DataTypes, evaluating their impact on DataFusion itself and on downstream crates.
Goals
Proposal
Defining a logical type
To define the list of logical types we must first take a look at the physical representation of the engine: the Arrow columnar format. DataTypes are the physical types of the DataFusion engine and they define storage and access pattern for buffers in the Arrow format.
Looking at a list of the possible DataTypes it's clear that while some map 1:1 with their logical representation other also specify information about the encoding (e.g.
Large*
,FixedSize*
,Dictionary
,RunEndEncoded
...). The latter must be consolidate into what they represent, discarding the encoding information and, in general, types that can store different ranges of values should be different logical types. (ref).What follows is a list of DataTypes and how would they map to their respective logical type following the rules above:
Null
Null
Boolean
Boolean
Int8
Int8
Int16
Int16
Int32
Int32
Int64
Int64
UInt8
UInt8
UInt16
Uint16
UInt32
UInt32
UInt64
UInt64
Float16
Float16
Float32
Float32
Float64
Float64
Timestamp(unit, tz)
Timestamp(unit, tz)
Date32
Date
Date64
Date
Date64
doesn't actually provide more precision. (docs)Time32(unit)
Time32(unit)
Time64(unit)
Time64(unit)
Duration(unit)
Duration(uint)
Interval(unit)
Interval(unit)
Binary
Binary
FixedSizeBinary(size)
Binary
LargeBinary
Binary
BinaryView
Binary
Utf8
Utf8
LargeUtf8
Utf8
Utf8View
Utf8
List(field)
List(field)
ListView(field)
List(field)
FixedSizeList(field, size)
List(field)
LargeList(field)
List(field)
LargeListView(field)
List(field)
Struct(fields)
Struct(fields)
Union(fields, mode)
Union(fields)
Dictionary(index_type, data_type)
data_type
, converted to logical typeDecimal128(precision, scale)
Decimal128(precision, scale)
Decimal256(precision, scale)
Decimal256(precision, scale)
Map(fields, sorted)
Map(fields, sorted)
RunEndEncoded(run_ends_type, data_type)
data_type
, converted to logical typeUser defined types
User defined physical types
The Arrow columnar format provides guidelines to define Extension types though the composition of native DataTypes and custom metadata in fields. Since this proposal already includes a mapping from DataType to logical type we could extend it to support user defined types (through extension types) which would map to a known logical type.
For example an extension type with the DataType of
List(UInt8)
and a custom metadata field{'ARROW:extension:name': 'myorg.mystring'}
could have a logical type ofUtf8
.User defined logical types
Arrow extension types can also be used to extend the list of supported logical types. An additional logical type called
Extension
could be introduced. This extension type would contain a structure detailing its logical type and the extension type metadata.Boundaries of logical and physical types
In plans and expressions
As the prefix suggests, logical types should be used exclusively in logical plans (LogicalPlan and Expr) while physical types should be used exclusively in physical plans (ExecutionPlan and PhysicalExpr). This would enable logical plans to be purely logical, without worrying about underlying encodings.
Expr in logical plans would need to represent their resulting value as logical types through the trait method ExprSchemable::get_type, which would need to return a logical type instead.
In functions
ScalarUDF, WindowUDF, and AggregateUDF all define their Signatures through the use of DataTypes. Function arguments are currently validated against signatures through type coercion during logical planning. With logical types Signatures would be expressed without the need to specify the underlying encoding. This would simplify the type coercion rules, removing the need of traversing dictionaries and handling different containers and focusing instead on explicit logical rules (e.g. all logical types can be coerced to
Utf8
).During execution the function receives a slice of ColumnarValue that is guaranteed to match the signature. Being strictly a physical operation, the function will have to deal with physical types. ColumnarValue enum could be extended so that functions could choose to provide their own optimised implementation for a subset of physical types and then fall back to a generic implementation that materialises the argument to known physical type. This would potentially allow native functions to support user defined physical types that map to known logical types.
In substrait
The
datafusion_substrait
crate provides helper functions to enable interoperability between substrait plans and datafusion's plan. While some effort has been made to support converting from / to DataTypes viatype_variation_reference
(example here), dictionaries and not supported as both literal types and cast types, leading to potential errors when trying to encode a valid LogicalPlan) into a substrait plan. The usage of logical types would enable a more seamless transition between DataFusion's native logical plan and substrait.Keeping track of the physical type
While logical types simplify the list of possible types that can be handled during logical planning, the relation to their underlying physical representation needs to be accounted for when transforming the logical plan into nested ExecutionPlan and PhysicalExpr which will dictate how will the query execute.
This proposal introduces a new trait that represents the link between a logical type and its underlying physical representation:
While
NativeType
would be primarily used for standard DataTypes and their logical relation,TypeRelation
is defined to provide support for used defined physical types.What follows is an exploration of the areas in which
LogicalPhysicalType
would need to get introduced:A new type of Schema
To support the creation of
LogicalPhysicalType
a new schema must be introduced, which can be consumed as either a logical schema or used to access the underlying physical representation. Currently DFSchema is used throughout DataFusion as a thin wrapper for Arrow's native Schema in order to qualify fields originating from potentially different tables. This proposal suggest to decouple the DFSchema from its underlying Schema and instead adopt a new Schema-compatible structure (LogicalPhysicalSchema
) but with DataTypes replaced byLogicalPhysicalType
. This would also mean the introduction of new Field-compatible structure (LogicalPhysicalField
) which also supportsLogicalPhysicalType
instead of Arrow's native Field DataType.DFSchema would be used by DataFusion's planning and execution engine to derive either logical or physical type information of each field. It should retain the current interoperability with Schema (and additionally the new
LogicalPhysicalSchema
) allowing easyInto
&From
conversion.Type sources
Types in plans sourced through Arrow's native Schema returned by implementations of TableSource / TableProvider , variables DataTypes returned by VarProvider , and ScalarValue. To allow definition of custom
LogicalPhysicalType
these type sources should be edited to returnLogicalPhysicalSchema
/LogicalPhysicalType
.Tables
For tables a non-breaking way of editing the trait to support
LogicalPhysicalSchema
could be:logical_physical_schema() -> LogicalPhysicalSchema
, this method's default implementation calls theschema()
and converts it toLogicalPhysicalSchema
without introducing any customLogicalPhysicalType
. Implementers are free to override this method and add customLogicalPhysicalType
.schema()
method to returnimpl Into<LogicalPhysicalSchema>
.The most impactful changes introduced by this proposal are the
LogicalPhysicalType
,LogicalPhysicalSchema
andLogicalPhysicalField
types. These structures would replace most of the mentions of DataType, Schema and Field in the DataFusion codebase. Type sources (TableProvider / TableSource, VarProvider, and ScalarValue) and Logical / ExecutionPlan nodes would be greatly affected by this change. This effect can be mitigated by providing goodInto
&From
implementations for the new types and providing editing existing function arguments and return types asimpl Into<LogicalPhysical*>
, but it will still break a lot of things.Case study: datafusion-comet
datafusion-comet
is a high-performance accelerator for Apache Spark, built on top of DataFusion. A fork of the project containing changes from this proposal currently compiles without modifications. As more features in this proposal are implemented, namely UDFs Logical Signature, some refactoring might be required (e.g forCometScalarFunction
and other functions defined in the codebase). Refer to the draft's TODOs to see what's missing.Non-goals topics that might be interesting to dive into
While not the primary goals of this proposal, here are some interesting topics that could be explored in the future:
RecordBatches with same logical type but different physical types
Integrating
LogicalPhysicalSchema
into DataFusion's RecordBatches, streamed from one ExecutionPlan to the other, could be an interesting approach to support the possibility of two record batches having logically equivalent schemas with different underlying physical types. This could be useful in situations where data, stored in multiple pages mapping 1:1 with RecordBatches, is encoded with different strategies based on the density of the rows and the cardinality of the values.Current work effort
The [Epic] issue tracking this effort can be found at #12622 .