[SPARK-56510][SQL] Fix ReplaceData DML without metadata attributes not projecting out the operation column#55372
[SPARK-56510][SQL] Fix ReplaceData DML without metadata attributes not projecting out the operation column#55372ZiyaZa wants to merge 4 commits intoapache:masterfrom
Conversation
szehon-ho
left a comment
There was a problem hiding this comment.
Good bug fix for the no-metadata-attributes code path in ReplaceData. The core change (introducing DataWithProjectionWritingSparkTask) is correct and well-targeted — it ensures the __row_operation column is projected out even when there are no metadata attributes. A few suggestions below.
| final val WRITE_WITHOUT_METADATA_OPERATION: Int = 5 | ||
| final val WRITE_OPERATION: Int = 6 |
There was a problem hiding this comment.
nit: The rename swaps both names AND integer values — WRITE_OPERATION goes from value 5 to 6. Since the values are arbitrary and only consumed by match statements in the same codebase, consider just renaming without swapping the integer values. This would avoid any (unlikely but possible) risk with code that may have hard-coded the integer values.
| val operation = row.getInt(0) | ||
|
|
||
| operation match { | ||
| case WRITE_OPERATION | WRITE_WITHOUT_METADATA_OPERATION => |
There was a problem hiding this comment.
nit: Worth a brief comment here explaining why both operation types are handled. This task is only used when there are no metadata attributes, yet WRITE_OPERATION-tagged rows (carryover/update rows in MERGE, all rows in DELETE/UPDATE) still arrive because the rewrite rules tag them with WRITE_OPERATION regardless of whether metadata attributes exist. Without the comment, a reader might wonder why a no-metadata writing task handles WRITE_OPERATION.
| package org.apache.spark.sql.connector | ||
|
|
||
| class GroupBasedNoMetadataDeleteFromTableSuite extends DeleteFromTableSuiteBase { | ||
|
|
There was a problem hiding this comment.
suggestion: Six new test files, each ~10 lines of actual code, is significant boilerplate. Consider adding a noMetadata flag to the existing suites and running them with both configurations (e.g., via a shared trait or parameterization). This would avoid class proliferation and keep the test matrix more maintainable.
| override def build(): Write = new Write with RequiresDistributionAndOrdering { | ||
| override def requiredDistribution: Distribution = { | ||
| Distributions.clustered(Array(PARTITION_COLUMN_REF)) | ||
| override def build(): Write = if (noMetadata) { |
There was a problem hiding this comment.
question: Is it intentional that the noMetadata path bypasses RequiresDistributionAndOrdering? This exercises a different physical plan (no shuffle/sort). If the goal is just to test the no-metadata code path, consider keeping the distribution/ordering requirements so these tests cover the same physical plan shape as the existing suites.
| val pk = id.getInt(0) | ||
| buffer.deletes += pk | ||
| val logEntry = new GenericInternalRow(Array[Any](DELETE, pk, meta.copy(), null)) | ||
| val metaCopy = if (meta != null) meta.copy() else null |
There was a problem hiding this comment.
This null guard is needed because DeltaWritingSparkTask passes null for metadata when requiredMetadataAttributes() is empty. However, the DeltaWriter API methods (delete(meta, id), update(meta, id, row), reinsert(meta, row)) don't document that meta can be null. Third-party connectors could hit the same NPE. Consider adding Javadoc on those API methods to clarify the contract.
|
Let me take a look. |
What changes were proposed in this pull request?
Previously, all DSv2 tests used an in-memory table that had some metadata attributes. This caused the code path for no-metadata attributes to be missed. This PR introduces a new property
no-metadatafor testing with an InMemoryTable without metadata attributes.Previous implementation had a bug for ReplaceData plans that it would use
DataWritingSparkTaskwithout projection, which means that the connector would receive one more column (the__row_operationcolumn) in addition to the row data to write. This is fixed in this PR by creating a new Writing TaskDataWithProjectionWritingSparkTaskthat supports projecting only row data.Additionally, following changes are done to clean-up the code:
WRITE_WITH_METADATA_OPERATIONtoWRITE_OPERATIONandWRITE_OPERATIONtoWRITE_WITHOUT_METADATA_OPERATIONto make the intention clearer. Previously, it was confusing that inDataWithProjectionWritingSparkTask, we hadWRITE_WITH_METADATA_OPERATIONwhen there are no metadata attributes.RowLevelWriteExecas a parent ofReplaceDataExec/WriteDeltaExec, which now holds a helpergetMetricValuefor metric computationWhy are the changes needed?
To fix a bug.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit tests.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6