Skip to content

[flink] Support lookup join for Chain Table#8423

Open
yunfengzhou-hub wants to merge 3 commits into
apache:masterfrom
yunfengzhou-hub:chain-table-lookup
Open

[flink] Support lookup join for Chain Table#8423
yunfengzhou-hub wants to merge 3 commits into
apache:masterfrom
yunfengzhou-hub:chain-table-lookup

Conversation

@yunfengzhou-hub

@yunfengzhou-hub yunfengzhou-hub commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Purpose

This PR adds support for Flink lookup join on Chain Table. When used as a dimension table, the lookup join reflects the latest state of the chain table: the most recent snapshot partition per group, combined with delta partitions that come after it.

Bug fixes in existing paths

  • ChainTableStreamScan.withBucketFilter(): Bucket filter was not propagated to inner scans nor stored for replay on local scans created in planStarting(). This caused bucket filter to be silently dropped during streaming read.
  • ChainGroupReadTable.copy() methods: Branch options were not properly prepared — scan.mode from the parent table could propagate to branch tables (conflicting with explicit snapshot pinning), and bucket values could be lost during copy(TableSchema). Added prepareBranchOptions() to consistently handle branch name, bucket restoration, and chain-table-level directive stripping across all copy methods.
  • ChainTableUtils: Added validateChainTableForIncrementalRead() to centralize validation that the delta branch uses DEDUPLICATE merge engine for incremental read paths (streaming read and lookup join).
  • ChainKeyValueFileReaderFactory.getDataSchema(): Missing return statement caused fall-through to incorrect branch logic when file belongs to the current branch.

New lookup join capabilities

  • LookupFileStoreTable: Added chain table awareness. When the wrapped table is a chain table, newStreamScan() returns ChainTableStreamScan. Added validation to reject partition keys in join condition and unsupported cache modes.
  • ChainTableStreamScan: Added pinnedOptions() helper to set both scan.snapshot-id and scan.mode=from-snapshot when pinning branch tables, preventing conflicts with scan.mode=latest-full inherited from the lookup join path.
  • ChainSplit: Added toString() for debugging.
  • Documentation: Added Lookup Join section to chain table docs.

Tests

Added comprehensive test coverage in FlinkChainTableITCase, including basic lookup join, bucket shuffle, delta-only/snapshot-only scenarios, constraint validation, incremental refresh, checkpoint/restore, predicate pushdown, and merge engine rejection.

@yunfengzhou-hub yunfengzhou-hub marked this pull request as ready for review July 2, 2026 05:38
/** Prepares options for a specific branch in chain table. */
private static Map<String, String> prepareBranchOptions(
Map<String, String> options, String branch, FileStoreTable sourceTable) {
Map<String, String> result = new HashMap<>(options);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still loses branch-specific table options on the SQL lookup path. BaseDataTableSource.timeTravelDisabledTable() builds a schema from the outer chain table options and calls table.copy(newSchema) before LookupFileStoreTable.create() validates the chain table. In copy(TableSchema), this helper starts from newTableSchema.options() and only restores branch and bucket, so a delta branch altered to merge-engine=partial-update or aggregation is copied back as the main table merge engine. The new validation then sees DEDUPLICATE and the unsupported delta branch is accepted. Please preserve branch-owned options from sourceTable.schema().options() (then overlay only safe dynamic scan options), and add a SQL lookup test for partial-update / aggregation delta branches.

if (CoreOptions.fromMap(wrapped.options()).isChainTable()) {
ChainGroupReadTable chainGroupReadTable =
(ChainGroupReadTable) ((FallbackReadFileStoreTable) wrapped).other();
return new ChainTableStreamScan(chainGroupReadTable);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For chain-table lookup this scan checkpoints the delta branch position, but the existing full-cache refresh logic still compares reader.nextSnapshotId() with context.table.snapshotManager().latestSnapshotId() / the outer table latest snapshot. Those are different snapshot sequences for chain tables. With lookup.refresh.async or lookup.refresh.full-load-threshold, the backlog calculation can be wrong and may skip the intended sync/full refresh behavior. Please either make the lookup refresh path use the delta branch snapshot manager for chain tables, or explicitly disable those backlog/full-load threshold checks for chain-table lookup.

* constraints before returning.
*/
public static LookupFileStoreTable create(FileStoreTable table, List<String> joinKeys) {
if (CoreOptions.fromMap(table.options()).isChainTable()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can misclassify branch tables. t$branch_delta / t$branch_snapshot may still carry chain-table.enabled=true, but FileStoreTableFactory#createChainTable returns the branch table directly when the current branch is the snapshot or delta branch, so it is not a FallbackReadFileStoreTable. A lookup join on a branch table would hit this cast and fail with ClassCastException (and the AUTO partial-lookup path is also skipped). Please gate this on the actual wrapper shape, e.g. table instanceof FallbackReadFileStoreTable && ((FallbackReadFileStoreTable) table).other() instanceof ChainGroupReadTable, and add a branch-table lookup regression test.

// For chain tables, the outer table and delta branch use different snapshot sequences,
// so the backlog calculation (latestSnapshotId - nextSnapshotId) is meaningless.
// Skip the backlog check and always do async refresh.
boolean isChainTable = CoreOptions.fromMap(table.options()).isChainTable();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch for the async backlog path, but the same cross-branch snapshot mismatch still exists in FileStoreLookupFunction#shouldDoFullLoad: it compares the outer table latest snapshot id with lookupTable.nextSnapshotId(), which for chain tables is the delta-branch checkpoint. If users enable lookup.refresh.full-load-threshold, this can trigger unnecessary full reloads or miss a real delta backlog. Please apply the same chain-table guard there, or compare against the delta branch's latest snapshot id instead.

}

// Verify new data is visible after refresh
List<String> refreshedResults = collectResult(query);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not really verify incremental refresh on the same lookup table. The second collectResult(query) starts a new query/job and rebuilds the lookup cache from scratch, so it can pass even if FullCacheLookupTable.refresh() never picks up the new delta snapshot. Could this be changed to keep one lookup table/job alive across the delta write, or to test the lookup function/table directly so the same cache instance performs the refresh?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants