[flink] Support lookup join for Chain Table#8423
Conversation
2de0fc1 to
2b64574
Compare
| /** Prepares options for a specific branch in chain table. */ | ||
| private static Map<String, String> prepareBranchOptions( | ||
| Map<String, String> options, String branch, FileStoreTable sourceTable) { | ||
| Map<String, String> result = new HashMap<>(options); |
There was a problem hiding this comment.
This still loses branch-specific table options on the SQL lookup path. BaseDataTableSource.timeTravelDisabledTable() builds a schema from the outer chain table options and calls table.copy(newSchema) before LookupFileStoreTable.create() validates the chain table. In copy(TableSchema), this helper starts from newTableSchema.options() and only restores branch and bucket, so a delta branch altered to merge-engine=partial-update or aggregation is copied back as the main table merge engine. The new validation then sees DEDUPLICATE and the unsupported delta branch is accepted. Please preserve branch-owned options from sourceTable.schema().options() (then overlay only safe dynamic scan options), and add a SQL lookup test for partial-update / aggregation delta branches.
| if (CoreOptions.fromMap(wrapped.options()).isChainTable()) { | ||
| ChainGroupReadTable chainGroupReadTable = | ||
| (ChainGroupReadTable) ((FallbackReadFileStoreTable) wrapped).other(); | ||
| return new ChainTableStreamScan(chainGroupReadTable); |
There was a problem hiding this comment.
For chain-table lookup this scan checkpoints the delta branch position, but the existing full-cache refresh logic still compares reader.nextSnapshotId() with context.table.snapshotManager().latestSnapshotId() / the outer table latest snapshot. Those are different snapshot sequences for chain tables. With lookup.refresh.async or lookup.refresh.full-load-threshold, the backlog calculation can be wrong and may skip the intended sync/full refresh behavior. Please either make the lookup refresh path use the delta branch snapshot manager for chain tables, or explicitly disable those backlog/full-load threshold checks for chain-table lookup.
| * constraints before returning. | ||
| */ | ||
| public static LookupFileStoreTable create(FileStoreTable table, List<String> joinKeys) { | ||
| if (CoreOptions.fromMap(table.options()).isChainTable()) { |
There was a problem hiding this comment.
This check can misclassify branch tables. t$branch_delta / t$branch_snapshot may still carry chain-table.enabled=true, but FileStoreTableFactory#createChainTable returns the branch table directly when the current branch is the snapshot or delta branch, so it is not a FallbackReadFileStoreTable. A lookup join on a branch table would hit this cast and fail with ClassCastException (and the AUTO partial-lookup path is also skipped). Please gate this on the actual wrapper shape, e.g. table instanceof FallbackReadFileStoreTable && ((FallbackReadFileStoreTable) table).other() instanceof ChainGroupReadTable, and add a branch-table lookup regression test.
| // For chain tables, the outer table and delta branch use different snapshot sequences, | ||
| // so the backlog calculation (latestSnapshotId - nextSnapshotId) is meaningless. | ||
| // Skip the backlog check and always do async refresh. | ||
| boolean isChainTable = CoreOptions.fromMap(table.options()).isChainTable(); |
There was a problem hiding this comment.
Good catch for the async backlog path, but the same cross-branch snapshot mismatch still exists in FileStoreLookupFunction#shouldDoFullLoad: it compares the outer table latest snapshot id with lookupTable.nextSnapshotId(), which for chain tables is the delta-branch checkpoint. If users enable lookup.refresh.full-load-threshold, this can trigger unnecessary full reloads or miss a real delta backlog. Please apply the same chain-table guard there, or compare against the delta branch's latest snapshot id instead.
| } | ||
|
|
||
| // Verify new data is visible after refresh | ||
| List<String> refreshedResults = collectResult(query); |
There was a problem hiding this comment.
This does not really verify incremental refresh on the same lookup table. The second collectResult(query) starts a new query/job and rebuilds the lookup cache from scratch, so it can pass even if FullCacheLookupTable.refresh() never picks up the new delta snapshot. Could this be changed to keep one lookup table/job alive across the delta write, or to test the lookup function/table directly so the same cache instance performs the refresh?
Purpose
This PR adds support for Flink lookup join on Chain Table. When used as a dimension table, the lookup join reflects the latest state of the chain table: the most recent snapshot partition per group, combined with delta partitions that come after it.
Bug fixes in existing paths
ChainTableStreamScan.withBucketFilter(): Bucket filter was not propagated to inner scans nor stored for replay on local scans created inplanStarting(). This caused bucket filter to be silently dropped during streaming read.ChainGroupReadTable.copy()methods: Branch options were not properly prepared —scan.modefrom the parent table could propagate to branch tables (conflicting with explicit snapshot pinning), and bucket values could be lost duringcopy(TableSchema). AddedprepareBranchOptions()to consistently handle branch name, bucket restoration, and chain-table-level directive stripping across all copy methods.ChainTableUtils: AddedvalidateChainTableForIncrementalRead()to centralize validation that the delta branch usesDEDUPLICATEmerge engine for incremental read paths (streaming read and lookup join).ChainKeyValueFileReaderFactory.getDataSchema(): Missing return statement caused fall-through to incorrect branch logic when file belongs to the current branch.New lookup join capabilities
LookupFileStoreTable: Added chain table awareness. When the wrapped table is a chain table,newStreamScan()returnsChainTableStreamScan. Added validation to reject partition keys in join condition and unsupported cache modes.ChainTableStreamScan: AddedpinnedOptions()helper to set bothscan.snapshot-idandscan.mode=from-snapshotwhen pinning branch tables, preventing conflicts withscan.mode=latest-fullinherited from the lookup join path.ChainSplit: AddedtoString()for debugging.Tests
Added comprehensive test coverage in
FlinkChainTableITCase, including basic lookup join, bucket shuffle, delta-only/snapshot-only scenarios, constraint validation, incremental refresh, checkpoint/restore, predicate pushdown, and merge engine rejection.