[python] Support row-based blob upsert by key#8435
Conversation
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for working on row-based blob upserts. The streaming-blob path itself looks useful, but I found a data-loss edge case in the new row append path that should be fixed before merge.
upsert_rows can accept rows carrying field metadata, so different rows may expose different column sets. When unmatched rows are appended, _append_rows derives all_ordered_cols only from row_items[0][1], then only requires later rows to contain that first row's columns. If a later new row contains an additional column, that column is silently omitted from the partial write and is read back as NULL.
I reproduced this on the PR head with a table (id, name, blob_data): call upsert_by_key([GenericRow([1, b'a'], [id, blob_data]), GenericRow([2, 'bob', b'b'], [id, name, blob_data])], ['id']). The second row's name='bob' is dropped and the result is {'id': 2, 'name': None, 'blob_data': b'b'}. This is especially risky because the API does not fail fast; it commits silently with lost data.
Please either require all row inputs in one append batch to have exactly the same field set, or build an explicit union schema and fill missing values intentionally. A regression test for heterogeneous GenericRow.fields in upsert_by_key would make this behavior clear.
Local checks I ran: py_compile, flake8 on the touched Python files, and the focused blob tests for write_row, row-based upsert_by_key, and partial update/rolling all passed. GitHub CI is also green at the time of review.
|
Addressed the review comment in a follow-up commit:
Local verification: Also did a small simplification so the append-field validation helper directly returns the write column list. |
|
Added another follow-up commit for non-blob Row coverage:
Local verification now covers 171 related tests: |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the update. The previous heterogeneous-row data-loss case is now fixed by failing fast when appended row inputs in the same row-based upsert batch do not have the same field set, and the new regression test covers it.
I re-ran the prior reproducer on this head and it now raises the expected ValueError instead of committing partial data. I also ran py_compile, flake8 on the touched Python files, the full table_upsert_by_key_test.py, the new write_row table-write tests, and the focused blob row-upsert/partial-update tests locally; all passed.
The implementation is still fairly involved, but the split between row value extraction, row-id updates, and row append writing is understandable now. No further blocker from my side.
Summary
Support row-based Python write/upsert APIs that can accept
Blobobjects without materializing them into bytes. This addsTableWrite.write_row(...)andTableUpdate.upsert_by_key(rows, upsert_keys)while preserving the existing Arrow upsert behavior.Changes
DedicatedFormatWriter.write_row(...)blob columns throughBlobWriter.write_blob(...)soBlob.new_input_stream()is used for streaming writes.upsert_by_key(rows, upsert_keys)with partition grouping, last-write-wins duplicate-key handling, matched-row updates, and unmatched-row appends.to_data()is called, covering bothwrite_rowand batchedupsert_by_key.Testing
python -m compileall -q pypaimon/write pypaimon/tests/blob_table_test.pypython -m pytest pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_write_row_accepts_streaming_blob pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_accepts_streaming_blob_row pypaimon/tests/table_upsert_by_key_test.py pypaimon/tests/table_update_test.pyNotes
upsert_by_keyaccepts a collection of rows. Passing a singleInternalRowis still normalized to a one-row batch internally for convenience.