Skip to content

[python] Support row-based blob upsert by key#8435

Merged
JingsongLi merged 3 commits into
apache:masterfrom
JingsongLi:codex/python-row-blob-upsert
Jul 2, 2026
Merged

[python] Support row-based blob upsert by key#8435
JingsongLi merged 3 commits into
apache:masterfrom
JingsongLi:codex/python-row-blob-upsert

Conversation

@JingsongLi

Copy link
Copy Markdown
Contributor

Summary

Support row-based Python write/upsert APIs that can accept Blob objects without materializing them into bytes. This adds TableWrite.write_row(...) and TableUpdate.upsert_by_key(rows, upsert_keys) while preserving the existing Arrow upsert behavior.

Changes

  • Add Row-to-field-name helpers and single-row partition/bucket extraction for Python writers.
  • Route DedicatedFormatWriter.write_row(...) blob columns through BlobWriter.write_blob(...) so Blob.new_input_stream() is used for streaming writes.
  • Add row-based upsert_by_key(rows, upsert_keys) with partition grouping, last-write-wins duplicate-key handling, matched-row updates, and unmatched-row appends.
  • Extend row-id updates to accept Row objects and keep Blob values as objects through blob delta writing.
  • Add streaming-only Blob tests that fail if to_data() is called, covering both write_row and batched upsert_by_key.

Testing

  • python -m compileall -q pypaimon/write pypaimon/tests/blob_table_test.py
  • python -m pytest pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_write_row_accepts_streaming_blob pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_accepts_streaming_blob_row pypaimon/tests/table_upsert_by_key_test.py pypaimon/tests/table_update_test.py

Notes

upsert_by_key accepts a collection of rows. Passing a single InternalRow is still normalized to a one-row batch internally for convenience.

@leaves12138 leaves12138 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on row-based blob upserts. The streaming-blob path itself looks useful, but I found a data-loss edge case in the new row append path that should be fixed before merge.

upsert_rows can accept rows carrying field metadata, so different rows may expose different column sets. When unmatched rows are appended, _append_rows derives all_ordered_cols only from row_items[0][1], then only requires later rows to contain that first row's columns. If a later new row contains an additional column, that column is silently omitted from the partial write and is read back as NULL.

I reproduced this on the PR head with a table (id, name, blob_data): call upsert_by_key([GenericRow([1, b'a'], [id, blob_data]), GenericRow([2, 'bob', b'b'], [id, name, blob_data])], ['id']). The second row's name='bob' is dropped and the result is {'id': 2, 'name': None, 'blob_data': b'b'}. This is especially risky because the API does not fail fast; it commits silently with lost data.

Please either require all row inputs in one append batch to have exactly the same field set, or build an explicit union schema and fill missing values intentionally. A regression test for heterogeneous GenericRow.fields in upsert_by_key would make this behavior clear.

Local checks I ran: py_compile, flake8 on the touched Python files, and the focused blob tests for write_row, row-based upsert_by_key, and partial update/rolling all passed. GitHub CI is also green at the time of review.

@JingsongLi

JingsongLi commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Addressed the review comment in a follow-up commit:

  • Reject appended row batches whose GenericRow.fields differ, so later rows cannot silently lose extra columns.
  • Reject schema-unknown row fields on upsert_by_key.
  • Added a regression test for heterogeneous row field sets in row-based blob upsert.

Local verification:
python -m compileall -q pypaimon/write pypaimon/tests/blob_table_test.py && python -m pytest pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_write_row_accepts_streaming_blob pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_accepts_streaming_blob_row pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_rejects_heterogeneous_append_row_fields pypaimon/tests/table_upsert_by_key_test.py pypaimon/tests/table_update_test.py

Also did a small simplification so the append-field validation helper directly returns the write column list.

@JingsongLi

Copy link
Copy Markdown
Contributor Author

Added another follow-up commit for non-blob Row coverage:

  • TableWrite.write_row on append-only partitioned tables, including reordered GenericRow.fields.
  • TableWrite.write_row on fixed-bucket primary-key tables.
  • TableWrite.write_row on dynamic-bucket primary-key tables.
  • TableUpdate.upsert_by_key(rows, ...) on ordinary non-blob tables, running in both batch and stream test classes.

Local verification now covers 171 related tests:
python -m compileall -q pypaimon/write pypaimon/tests/blob_table_test.py pypaimon/tests/table_upsert_by_key_test.py pypaimon/tests/write/table_write_test.py && python -m pytest pypaimon/tests/write/table_write_test.py pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_write_row_accepts_streaming_blob pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_accepts_streaming_blob_row pypaimon/tests/blob_table_test.py::DedicatedFormatWriterTest::test_upsert_by_key_rejects_heterogeneous_append_row_fields pypaimon/tests/table_upsert_by_key_test.py pypaimon/tests/table_update_test.py

@leaves12138 leaves12138 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The previous heterogeneous-row data-loss case is now fixed by failing fast when appended row inputs in the same row-based upsert batch do not have the same field set, and the new regression test covers it.

I re-ran the prior reproducer on this head and it now raises the expected ValueError instead of committing partial data. I also ran py_compile, flake8 on the touched Python files, the full table_upsert_by_key_test.py, the new write_row table-write tests, and the focused blob row-upsert/partial-update tests locally; all passed.

The implementation is still fairly involved, but the split between row value extraction, row-id updates, and row append writing is understandable now. No further blocker from my side.

@JingsongLi JingsongLi merged commit 2844adf into apache:master Jul 2, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants