feat: Add SQL planner, physical planner, and TableProvider hook for MERGE INTO#22988
feat: Add SQL planner, physical planner, and TableProvider hook for MERGE INTO#22988wirybeaver wants to merge 2 commits into
Conversation
Add merge_into async method to TableProvider trait for MERGE INTO DML support. The method accepts: - source: ExecutionPlan representing the USING clause - on: Expr representing the ON join condition - clauses: Vec<MergeIntoClause> for WHEN MATCHED/NOT MATCHED actions Default implementation returns not_impl_err for tables that don't support MERGE INTO operations.
There was a problem hiding this comment.
@wirybeaver
Thanks for the work on MERGE planning. I found two issues that look like they should be fixed before this lands.
| let target_table_source = self | ||
| .context_provider | ||
| .get_table_source(target_table_ref.clone())?; | ||
| let target_schema = Arc::new(DFSchema::try_from_qualified_schema( |
There was a problem hiding this comment.
I think target aliases need a bit more handling here. Right now MERGE INTO target AS t USING source AS s ON t.id = s.id ... accepts the alias, but target_schema is still qualified with the table name rather than t. That means normal alias-qualified references in the ON clause or WHEN AND expressions will not resolve.
Could we either reject target aliases explicitly for now, or qualify the target side with the alias so this matches the rest of SQL planning?
There was a problem hiding this comment.
Fixed — when an alias is present we now use it as the schema qualifier via TableReference::bare(alias.name.value.clone()), so t.col resolves correctly in ON and WHEN expressions.
| MergeIntoAction::Update(assignments) | ||
| } | ||
| ast::MergeAction::Insert(insert_expr) => { | ||
| let columns: Vec<String> = insert_expr |
There was a problem hiding this comment.
I think the MERGE INSERT actions need the same kind of validation that regular INSERT planning does. At the moment, unknown columns, duplicate columns, and mismatched columns / values lengths can all plan successfully. Empty columns also does not appear to require values for every target column.
That can leave TableProviders receiving malformed actions instead of a validated logical operation. Could we normalize and validate target column names, reject duplicates, and verify that the value count matches either the explicit columns or the full target schema?
There was a problem hiding this comment.
Fixed — we now validate before building the action: duplicate column names are rejected, each named column is checked against the target schema via field_with_unqualified_name, and the value count must equal either the explicit column count (when a column list is given) or the full target schema width (when none is given).
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| // 6. Build the DmlStatement |
There was a problem hiding this comment.
Small cleanup suggestion: this could return Ok(LogicalPlan::Dml(DmlStatement::new(...))) directly instead of binding let plan and then returning Ok(plan).
There was a problem hiding this comment.
Done, returns Ok(LogicalPlan::Dml(...)) directly now.
| .assignments | ||
| .into_iter() | ||
| .map(|assign| { | ||
| let col_name = match &assign.target { |
There was a problem hiding this comment.
Small follow-up suggestion: the update-column extraction and insert-column extraction both walk an ObjectName and take the last identifier. It might be worth adding a small private helper that returns Result<String> and produces a planner error for non-ident parts, rather than using unwrap() at each call site.
There was a problem hiding this comment.
Done — extracted ident_from_object_name_last as a private associated function that returns Result<String> and errors on non-ident parts. Both UPDATE and INSERT column extraction now go through it.
cb89e53 to
6f0dfe1
Compare
There was a problem hiding this comment.
@wirybeaver
Thanks for the updates here. I think a couple of identifier normalization details still need to be tightened up before this is ready.
| // `MERGE INTO target AS t`. Fall back to the table reference itself. | ||
| let target_qualifier = target_alias | ||
| .as_ref() | ||
| .map(|a| TableReference::bare(a.name.value.clone())) |
There was a problem hiding this comment.
Nice fix to use the target alias as the schema qualifier here. One remaining issue is that this uses a.name.value.clone() directly, so unquoted aliases are not normalized the same way as the rest of planning.
For example, MERGE INTO target AS T ... ON T.id = s.id can store the target qualifier as T, while expression planning normalizes T.id to t.id. That can still make alias-qualified target references fail.
Could we build the alias qualifier with self.ident_normalizer.normalize(a.name.clone()) before passing it to TableReference::bare(...)?
There was a problem hiding this comment.
Good catch — now using self.ident_normalizer.normalize(a.name.clone()) before passing to TableReference::bare, so unquoted aliases like T are lowercased to match expression planning.
| let columns: Vec<String> = insert_expr | ||
| .columns | ||
| .iter() | ||
| .map(|c| Self::ident_from_object_name_last(c)) |
There was a problem hiding this comment.
I think the MERGE INSERT column handling still needs one more normalization step. The extracted column names currently keep the raw Ident.value from ident_from_object_name_last, so validation happens before SQL identifier normalization.
That means something like MERGE ... INSERT (ID) VALUES (...) can be rejected against a target column named id, and duplicate detection may not match regular INSERT behavior.
Could we normalize the extracted column identifiers before duplicate checks and before the field_with_unqualified_name lookup, matching regular INSERT planning?
There was a problem hiding this comment.
Fixed — both UPDATE and INSERT column names are now normalized through self.ident_normalizer.normalize(Ident::new(...)) before duplicate checks and schema lookups, matching regular INSERT planning behavior.
Implement merge_to_plan and merge_clause_to_plan in SQL planner: - Parse Statement::Merge into LogicalPlan::Dml with WriteOp::MergeInto - Resolve target table and plan source (USING clause) as LogicalPlan - Build combined schema for target + source to resolve ON and WHEN expressions - Convert ON condition and WHEN clauses to DataFusion Expr - Handle UPDATE, INSERT, and DELETE actions in WHEN clauses Add physical planner dispatch for WriteOp::MergeInto: - Use source_as_provider() to recover the TableProvider from the TableSource - Extract source ExecutionPlan from children - Call TableProvider::merge_into with source plan, ON condition, and clauses - Wrap errors with MERGE INTO operation context Wire MergeInto's expressions through LogicalPlan tree-traversal so optimizers can rewrite them: add MergeIntoOp::exprs() (stable iteration order: on, then per-clause predicate + action value Exprs) and MergeIntoOp::with_new_exprs() to rebuild the op from a transformed expr vector. Branch LogicalPlan::apply_expressions, map_expressions, and with_new_exprs on WriteOp::MergeInto to use these helpers; other WriteOp variants continue to expose no expressions as before.
6f0dfe1 to
799ed48
Compare
|
Took this branch for a spin against a provider-side consumer (we're building an Iceberg SCD2 upsert on this hook), and I think there's one blocking issue the current tests don't reach: any target-column reference in the Minimal repro (custom provider registered as MERGE INTO t USING batch s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET val = s.valWhat's happening: the Two directions I can see:
I'd lean toward 2, but 1 seems fine to unblock if you'd rather keep this PR small. Happy to share the test module I used — a |
Which issue does this PR close?
Follow-up to #20763 (merged) which added
MergeIntoOp,MergeIntoClause, and proto types.Rationale for this change
MERGE INTO(SQL:2003) is a widely-used DML statement for upsert/conditional update workloads. This PR wires the types introduced in #20763 through the SQL planner, physical planner, andTableProvidertrait so that table implementations can actually execute merge operations.What changes are included in this PR?
datafusion/catalog—TableProvidertrait extensionmerge_into(source, on, clauses)async method with a defaultnot_impl_errimpl so existing providers are unaffected.datafusion/sql— SQL → LogicalPlanstatement.rs: parseStatement::MergeintoLogicalPlan::DmlwithWriteOp::MergeInto.USINGsource into aLogicalPlan.ONandWHENexpressions.ONcondition andWHEN MATCHED / NOT MATCHEDclauses to DataFusionExpr.datafusion/expr— expression plumbingMergeIntoOp::exprs(): stable iteration over all expressions (ON, then per-clause predicate + action values).MergeIntoOp::with_new_exprs(): rebuild op from a transformed expr vector.LogicalPlan::apply_expressions,map_expressions, andwith_new_exprsonWriteOp::MergeIntoso optimizers can rewrite merge expressions. OtherWriteOpvariants are unchanged.datafusion/core— physical planner dispatchWriteOp::MergeIntoin the physical planner.TableProviderviasource_as_provider(), extract the sourceExecutionPlan, and callTableProvider::merge_into.Are these changes tested?
datafusion/proto/tests/cases/roundtrip_logical_plan.rs(proto round-trip forMergeInto).MergeIntoOp::exprs/with_new_exprsare included indml.rs.TableProviderthat implementsmerge_into; that is left to follow-up once a concrete provider (e.g. Delta Lake) adopts the hook.Are there any user-facing changes?
TableProvidergains a newmerge_intomethod. The default implementation returnsnot_impl_err, so existing implementations compile without changes.MERGE INTO <table> USING <source> ON <cond> WHEN ...SQL syntax is now accepted by the DataFusion SQL parser and planner.