[python] Add multimodal blob store API#8432
Conversation
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for adding the blob store facade. I found one blocker around the overwrite contract and one smaller limit edge case.
The blocker is that put_object / put_objects implement overwrite as delete_by_predicate(keys) plus an append in the same commit. When two writers concurrently create the same previously-absent key, both delete phases see no existing row, both commits can succeed, and the table is left with two rows for the same object key. After that, head_object / get_object fail with ValueError: Multiple rows found for blob key ..., so a normal concurrent create can corrupt the object namespace.
I reproduced this on the PR head by preparing two batch write builders for the same absent key, each doing delete_by_predicate(store._keys_predicate(["k"])) and writing a replacement row, then committing both. Both commits succeeded and table.scan().to_list() returned two rows for k; store.get_object("k") then failed with the duplicate-key error. Please add conflict/retry/unique-key protection for same-key concurrent puts (or otherwise make this limitation explicit and recoverable), and add a regression test for the absent-key race.
Smaller issue: list_objects(limit=0) currently returns the first object because the limit check happens after appending, and negative limits behave the same. Please validate limit >= 0 and return an empty list for limit == 0.
|
What a wonderful idea! The S3-like object API makes BLOB columns much easier to use |
|
Addressed the review comments in
Re-ran |
|
Updated in The implementation still keeps the post-commit uniqueness check because Re-ran |
| end = int(end_text) | ||
| if end < start: | ||
| raise ValueError("Range end must be greater than or equal to start.") | ||
| length = end - start + 1 |
There was a problem hiding this comment.
This boundary check should also clamp explicit bytes=start-end ranges: when total_length >= 0, end must
be at most total_length - 1; otherwise the ranged descriptor may read past this blob boundary into the
next blob.
There was a problem hiding this comment.
Fixed in 674e73354e: explicit bytes=start-end ranges now reject start >= total_length and clamp end to total_length - 1 when the blob length is known, so the ranged descriptor cannot read into the next blob. Added coverage for bytes=10-999 returning only the tail of the current blob and for start-past-end rejection. Re-ran python -m pytest paimon-python/pypaimon/tests/multimodal_table_test.py (45 passed), compileall, and git diff --check.
|
We should modify this PR based on #8435 |
f8e24e9 to
6d22a1a
Compare
|
Rebased this branch on the latest Also added a stronger streaming regression test in The write path remains Re-ran |
|
Fixed the lingering Arrow upsert path in e7edfac. BlobStore now calls |
|
Follow-up fixed in 5c8ed19: removed the remaining Arrow write from the blob race regression test as well. The test now uses |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the update. The previous limit handling and absent-key duplicate cleanup are much better now, and the focused local checks pass. I still found one remaining race in the result-read path that can surface transient duplicate rows to callers, so I think this needs one more fix before merge.
| return all(counts.get(key, 0) == 1 for key in keys) | ||
|
|
||
| def _put_result(self, key, snapshot_id: Optional[int]) -> PutObjectResult: | ||
| info = self.head_object(key) |
There was a problem hiding this comment.
_put_result re-reads the latest table after _overwrite_rows has already returned. There is still a race window between _has_unique_rows(keys) and this head_object call: another concurrent absent-key put can append the same key in that window, so this call can raise ValueError("Multiple rows found..."). I could reproduce this with two concurrent put_object("payloads/1", ...) calls; one future intermittently fails from this line even though the other call later repairs the table. Could we keep the result read inside the same retry/replace loop, or catch duplicate/NoSuchKey here and run the replace retry before returning, so callers do not observe the transient duplicate?
|
Added column update APIs in d6771c3: |
|
Addressed the remaining result-read race in 79c2a97. |
|
Added |
leaves12138
left a comment
There was a problem hiding this comment.
Reviewed the latest update under the intended semantics that BlobStore does not enforce object-key uniqueness yet, and table-level/secondary unique key support can be introduced later. The result path no longer re-reads the latest snapshot after put, metadata-only updates are covered, and the focused local checks passed. No further blockers from my side.
Summary
Adds an S3-like object facade for PyPaimon multimodal BLOB columns. The API supports object-style writes, reads, listing, deletion, byte ranges, table-column projection, and Paimon-native descriptor/reference storage.
Changes
BlobStorewithput_object,put_objects,get_object,head_object,list_objects, and delete APIs.Blobinputs, external URI/descriptor inputs, managed blob copies, andblob-descriptor-fieldreference preservation.MultimodalTable.blobs()and export blob store result types frompypaimon.multimodal.Testing
python -m pytest paimon-python/pypaimon/tests/multimodal_table_test.pypython -m compileall -q paimon-python/pypaimon/multimodal/blob_store.py paimon-python/pypaimon/tests/multimodal_table_test.pygit diff --check