feat(connectors): add MySQL source polling connector#3568
feat(connectors): add MySQL source polling connector#3568tusharagrahari wants to merge 5 commits into
Conversation
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3568 +/- ##
=============================================
- Coverage 74.11% 44.10% -30.01%
Complexity 937 937
=============================================
Files 1258 1256 -2
Lines 131485 115195 -16290
Branches 107354 91107 -16247
=============================================
- Hits 97444 50808 -46636
- Misses 30955 61638 +30683
+ Partials 3086 2749 -337
🚀 New features to boost your workflow:
|
atharvalade
left a comment
There was a problem hiding this comment.
Hey @tusharagrahari, thanks for the contribution and for taking the time to put this together.
A couple of process things before we dig into review:
-
Issue approval: I can see you created #3445 yourself, which is great. However, per our CONTRIBUTING.md, new functionality needs maintainer approval on the issue before coding begins (a
good-first-issuelabel or an explicit comment). Connectors are also listed as a high-risk area that needs a design discussion in the issue first. We typically ask first-time contributors to start with something labeledgood-first-issueto get familiar with the repo's workflow and CI, then move on to bigger features like this. -
Failing CI: Could you take a look and get those green?
Not saying this won't get merged, the implementation looks good to start with. Just want to make sure we follow the process so there are no surprises down the line.
| if !processed_ids.is_empty() { | ||
| self.mark_or_delete_processed_rows(pool, table, pk_column, &processed_ids) | ||
| .await?; | ||
| } |
There was a problem hiding this comment.
mark_or_delete_processed_rows runs inside poll_tables() before the messages reach the runtime's producer.send(). If the process crashes or publish fails after this point, those rows are gone from MySQL but never delivered to Iggy. It would be better to return the pending IDs in ProducedMessages and let the runtime execute the delete after confirmed send.
There was a problem hiding this comment.
Valid concern — I hit this too. The delete-before-send window exists because the SDK has no post-send ack primitive, not a conscious design choice. I modeled it on postgres_source as the reference, but that's context not a justification. Proper fix needs a post-send hook on the Source trait — SDK-level change, out of scope for this PR.
|
Hi @atharvalade, thanks for the detailed feedback! On the process side — I did reach out on the Discord community before starting, where I got clearance to proceed with the connector work. I understand that might not substitute for a formal maintainer approval comment on the issue itself, and I'll make sure to get that explicitly documented going forward before picking up anything in high-risk areas. On the CI failures — on it, will get those green shortly. Also working through the inline review comments now. Will push the fixes once the CI is sorted. |
1c4d3d9 to
dd4ea5c
Compare
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
a couple of notes that don't anchor to the diff:
core/connectors/sources/README.mdsource table doesn't list mysql_source yet - worth adding a row.- the two delivery-semantics comments below (delete/mark before send, offset advance on send failure) share one root cause - there is no send ack from the runtime back to the connector - and postgres_source behaves the same way. the sink side of the exact same gap is already tracked in #2927 (consume() return value discarded) and #2928 (offsets committed before sink processing), and #2940 discusses the broader replay-safe progress contract. the source side deserves a sibling issue rather than a fix in this PR.
| .map(serde_json::Value::from) | ||
| .unwrap_or(serde_json::Value::Null)) | ||
| } | ||
| "DECIMAL" => { |
There was a problem hiding this comment.
DECIMAL columns can never be polled. sqlx checks T::compatible(type_info) before decoding, and String is not compatible with decimal types (sqlx-mysql accepts only the varchar/blob/enum family for strings), so try_get::<Option<String>> fails on every row. the workspace sqlx dep has neither bigdecimal nor rust_decimal features enabled, so there is currently no compatible type to decode into at all. since a single row error aborts the whole table batch (line 621) and the offset never advances, any table with a DECIMAL column stalls forever - and the readme documents DECIMAL as supported.
fix: enable the rust_decimal (or bigdecimal) sqlx feature and decode via that + to_string(), or use try_get_unchecked (mysql sends DECIMAL as ascii text on the wire). none of the test tables has a DECIMAL column, which is why CI is green - worth adding one.
| .map(|t| serde_json::Value::String(t.to_string())) | ||
| .unwrap_or(serde_json::Value::Null)) | ||
| } | ||
| "DATETIME" | "TIMESTAMP" => { |
There was a problem hiding this comment.
same class of bug as the DECIMAL one: NaiveDateTime in sqlx-mysql does not override compatible(), so it accepts DATETIME only, never TIMESTAMP (only DateTime<Utc>/DateTime<Local> accept both). any table with a TIMESTAMP column (created_at/updated_at - the natural tracking columns) fails every row and the table stalls permanently.
fix: decode TIMESTAMP via DateTime<Utc> and keep NaiveDateTime for DATETIME. also worth a test with a TIMESTAMP tracking column.
| .collect::<Vec<_>>() | ||
| .join(", "); | ||
|
|
||
| let query = if self.config.delete_after_read.unwrap_or(false) { |
There was a problem hiding this comment.
delete_after_read / processed_column commit their DELETE / UPDATE to mysql during poll, before the runtime has sent the batch to iggy (the runtime persists connector state only after a successful send, and the Source trait has no ack path). a send failure or crash in that window means rows destroyed or marked but never delivered - these modes are effectively at-most-once.
this can't be fixed connector-side without a runtime send-ack, so at minimum the readme should document both modes as at-most-once. the sink side of this gap is already tracked (#2927, #2928) and #2940 discusses the contract - the source side needs the sibling issue (postgres_source has the same behavior).
| let mut state = self.state.lock().await; | ||
| state.processed_rows += total_processed; | ||
| for (table, offset) in state_updates { | ||
| state.tracking_offsets.insert(table, offset); |
There was a problem hiding this comment.
tracking offsets advance in memory at the end of every poll regardless of whether the runtime managed to send the batch. the connector never re-reads persisted state while running, so a transient send failure without a crash means the next poll queries past the failed batch and those rows are never read again. crash+restart recovers (disk state is older) - only the live send-fail path loses data. same runtime-ack limitation as the delete_after_read comment - source-side sibling of #2927/#2928, same follow-up issue.
|
|
||
| // Phase 2: commit each table independently. A table's messages are emitted | ||
| // only after its rows are marked or deleted, so a failure here can never | ||
| // leave a table marked-but-unpublished. `mark_or_delete_processed_rows` |
There was a problem hiding this comment.
this guarantee only holds within a single poll - a crash or send failure after the mark/delete commits leaves exactly a marked-but-unpublished table (the messages are published by the runtime after poll returns). reword to scope it to the mark-call failure case, otherwise it reads as a stronger guarantee than the code provides.
| // server unavailability | ||
| 1053 | 1152 | 1080 | | ||
| // connection/network | ||
| 2006 | 2013 | 1158 | 1159 | 1160 | 1161 | |
There was a problem hiding this comment.
2006/2013 are client-side CR_* codes - sqlx surfaces lost connections as sqlx::Error::Io, and MySqlDatabaseError::number() only ever carries server ER_* codes, so these two can't match here. harmless, but they suggest coverage that isn't there. 1158-1161 are real server codes and fine.
| Some(self.extract_payload_column(row, i, config.payload_format)?); | ||
| } | ||
| if name == config.tracking_column { | ||
| max_offset = value_as_string(&extract_column_value(row, i)?); |
There was a problem hiding this comment.
with the default config tracking_column == pk_column, so this loop decodes the same column twice through extract_column_value (and the non-payload path at 674-679 clones the value twice through value_as_string). extract once and reuse when the names match.
| - **Mark as Processed**: Mark rows as processed using a boolean column | ||
| - **Multiple Tables**: Monitor multiple tables simultaneously | ||
| - **Batch Processing**: Fetch data in configurable batch sizes | ||
| - **Offset Tracking**: Keep track of processed records to avoid duplicates |
There was a problem hiding this comment.
"avoid duplicates" oversells - the delete/mark modes are at-most-once (rows mutated before the iggy send is confirmed), and offset mode can either lose a batch on a live send failure or replay after a crash. a short delivery-semantics section would set expectations honestly.
| publish = false | ||
|
|
||
| [package.metadata.cargo-machete] | ||
| ignored = ["dashmap", "simd-json"] |
There was a problem hiding this comment.
the simd-json entry is redundant - machete detects the fully qualified simd_json:: calls (random_source uses it the same way with no ignore and passes). dashmap genuinely needs the ignore since it's only used inside the source_connector! expansion. suggest ignored = ["dashmap"].
| use crate::connectors::TestMessage; | ||
| use serde::Deserialize; | ||
|
|
||
| const TEST_MESSAGE_COUNT: usize = 3; |
There was a problem hiding this comment.
these consts + DatabaseRecord are byte-identical to the postgres test mod - could live next to TestMessage in the shared connectors mod. optional.
Which issue does this PR address?
Relates to #3445
Rationale
MySQL is a widely used relational database with no existing Iggy source connector. This adds incremental table polling so users can stream MySQL rows into Iggy topics without CDC infrastructure.
What changed?
Before this PR, there was no way to source data from MySQL into Iggy.
This adds a mysql_source connector plugin supporting incremental table polling via a configurable tracking column. Rows are streamed as JSON, raw bytes, or text depending on the configured payload format. Post-processing options (delete after read, mark as processed) and custom SQL queries with parameter substitution are also supported. CDC (binlog-based) is out of scope and planned as a follow-up.
Local Execution
AI Usage