Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions crates/integrations/datafusion/tests/pk_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,3 +1259,124 @@ async fn test_pk_first_row_insert_overwrite() {
"After second OVERWRITE: still 2 files (no stale level-0 files accumulated)"
);
}

// ======================= Postpone Bucket (bucket = -2) =======================

/// Postpone bucket files are invisible to normal SELECT but visible via scan_all_files.
#[tokio::test]
async fn test_postpone_write_invisible_to_select() {
let (_tmp, catalog) = create_test_env();
let handler = create_handler(catalog.clone());
handler
.sql("CREATE SCHEMA paimon.test_db")
.await
.expect("CREATE SCHEMA failed");

handler
.sql(
"CREATE TABLE paimon.test_db.t_postpone (
id INT NOT NULL, value INT,
PRIMARY KEY (id)
) WITH ('bucket' = '-2')",
)
.await
.unwrap();

// Write data
handler
.sql("INSERT INTO paimon.test_db.t_postpone VALUES (1, 10), (2, 20), (3, 30)")
.await
.unwrap()
.collect()
.await
.unwrap();

// scan_all_files should find the postpone file
let table = catalog
.get_table(&Identifier::new("test_db", "t_postpone"))
.await
.unwrap();
let plan = table
.new_read_builder()
.new_scan()
.with_scan_all_files()
.plan()
.await
.unwrap();
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
assert_eq!(file_count, 1, "scan_all_files should find 1 postpone file");

// Normal SELECT should return 0 rows (postpone files are invisible)
let count = row_count(&handler, "SELECT * FROM paimon.test_db.t_postpone").await;
assert_eq!(count, 0, "SELECT should return 0 rows for postpone table");
}

/// INSERT OVERWRITE on a postpone table should replace old files with new ones.
#[tokio::test]
async fn test_postpone_insert_overwrite() {
let (_tmp, catalog) = create_test_env();
let handler = create_handler(catalog.clone());
handler
.sql("CREATE SCHEMA paimon.test_db")
.await
.expect("CREATE SCHEMA failed");

handler
.sql(
"CREATE TABLE paimon.test_db.t_postpone_ow (
id INT NOT NULL, value INT,
PRIMARY KEY (id)
) WITH ('bucket' = '-2')",
)
.await
.unwrap();

// First commit
handler
.sql("INSERT INTO paimon.test_db.t_postpone_ow VALUES (1, 10), (2, 20)")
.await
.unwrap()
.collect()
.await
.unwrap();

let table = catalog
.get_table(&Identifier::new("test_db", "t_postpone_ow"))
.await
.unwrap();
let plan = table
.new_read_builder()
.new_scan()
.with_scan_all_files()
.plan()
.await
.unwrap();
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
assert_eq!(file_count, 1, "After INSERT: 1 postpone file");

// INSERT OVERWRITE should replace old file
handler
.sql("INSERT OVERWRITE paimon.test_db.t_postpone_ow VALUES (3, 30)")
.await
.unwrap()
.collect()
.await
.unwrap();

let table = catalog
.get_table(&Identifier::new("test_db", "t_postpone_ow"))
.await
.unwrap();
let plan = table
.new_read_builder()
.new_scan()
.with_scan_all_files()
.plan()
.await
.unwrap();
let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum();
assert_eq!(
file_count, 1,
"After OVERWRITE: only 1 new file (old file deleted)"
);
}
15 changes: 15 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ const BUCKET_KEY_OPTION: &str = "bucket-key";
const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type";
const BUCKET_OPTION: &str = "bucket";
const DEFAULT_BUCKET: i32 = -1;
/// Postpone bucket mode: data is written to `bucket-postpone` directory
/// and is invisible to readers until compaction assigns real bucket numbers.
pub const POSTPONE_BUCKET: i32 = -2;
/// Directory name for postpone bucket files.
pub const POSTPONE_BUCKET_DIR: &str = "bucket-postpone";
const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries";
const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout";
const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait";
Expand Down Expand Up @@ -63,6 +68,16 @@ pub enum MergeEngine {
FirstRow,
}

/// Format the bucket directory name for a given bucket number.
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`.
pub fn bucket_dir_name(bucket: i32) -> String {
if bucket == POSTPONE_BUCKET {
POSTPONE_BUCKET_DIR.to_string()
} else {
format!("bucket-{bucket}")
}
}

/// Typed accessors for common table options.
///
/// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight.
Expand Down
10 changes: 6 additions & 4 deletions crates/paimon/src/table/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
use crate::arrow::format::{create_format_writer, FormatFileWriter};
use crate::io::FileIO;
use crate::spec::stats::BinaryTableStats;
use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW};
use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW};
use crate::Result;
use arrow_array::RecordBatch;
use chrono::Utc;
Expand Down Expand Up @@ -133,11 +133,13 @@ impl DataFileWriter {
);

let bucket_dir = if self.partition_path.is_empty() {
format!("{}/bucket-{}", self.table_location, self.bucket)
format!("{}/{}", self.table_location, bucket_dir_name(self.bucket))
} else {
format!(
"{}/{}/bucket-{}",
self.table_location, self.partition_path, self.bucket
"{}/{}/{}",
self.table_location,
self.partition_path,
bucket_dir_name(self.bucket)
)
};
self.file_io.mkdirs(&format!("{bucket_dir}/")).await?;
Expand Down
57 changes: 34 additions & 23 deletions crates/paimon/src/table/kv_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ pub(crate) struct KeyValueWriteConfig {
pub sequence_field_indices: Vec<usize>,
/// Merge engine for deduplication.
pub merge_engine: MergeEngine,
/// Column index in user schema that provides the row kind value.
/// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None (all INSERT).
pub value_kind_col_index: Option<usize>,
}

impl KeyValueFileWriter {
Expand Down Expand Up @@ -200,23 +197,8 @@ impl KeyValueFileWriter {
let min_key = self.extract_key_binary_row(&combined, first_row)?;
let max_key = self.extract_key_binary_row(&combined, last_row)?;

// Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...]
let user_fields = user_schema.fields();
let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
physical_fields.push(Arc::new(ArrowField::new(
SEQUENCE_NUMBER_FIELD_NAME,
ArrowDataType::Int64,
false,
)));
physical_fields.push(Arc::new(ArrowField::new(
VALUE_KIND_FIELD_NAME,
ArrowDataType::Int8,
false,
)));
for field in user_fields.iter() {
physical_fields.push(field.clone());
}
let physical_schema = Arc::new(ArrowSchema::new(physical_fields));
// Build physical schema and open writer.
let physical_schema = build_physical_schema(&user_schema);

// Open parquet writer.
let file_name = format!(
Expand Down Expand Up @@ -262,8 +244,13 @@ impl KeyValueFileWriter {
},
)?,
);
// Value kind column.
match self.config.value_kind_col_index {
// Value kind column — resolve from batch schema.
let vk_idx = combined
.schema()
.fields()
.iter()
.position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME);
match vk_idx {
Some(vk_idx) => {
physical_columns.push(
arrow_select::take::take(
Expand All @@ -282,8 +269,11 @@ impl KeyValueFileWriter {
physical_columns.push(Arc::new(Int8Array::from(vec![0i8; chunk_len])));
}
}
// All user columns.
// All user columns (skip _VALUE_KIND if present — already handled above).
for idx in 0..combined.num_columns() {
if Some(idx) == vk_idx {
continue;
}
physical_columns.push(
arrow_select::take::take(combined.column(idx).as_ref(), &chunk_indices, None)
.map_err(|e| crate::Error::DataInvalid {
Expand Down Expand Up @@ -459,3 +449,24 @@ impl KeyValueFileWriter {
Ok(builder.build_serialized())
}
}

/// Build the physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, user_cols (excluding _VALUE_KIND)...]
pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) -> Arc<ArrowSchema> {
let mut physical_fields: Vec<Arc<ArrowField>> = Vec::new();
physical_fields.push(Arc::new(ArrowField::new(
SEQUENCE_NUMBER_FIELD_NAME,
ArrowDataType::Int64,
false,
)));
physical_fields.push(Arc::new(ArrowField::new(
VALUE_KIND_FIELD_NAME,
ArrowDataType::Int8,
false,
)));
for field in user_schema.fields().iter() {
if field.name() != VALUE_KIND_FIELD_NAME {
physical_fields.push(field.clone());
}
}
Arc::new(ArrowSchema::new(physical_fields))
}
1 change: 1 addition & 0 deletions crates/paimon/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod full_text_search_builder;
pub(crate) mod global_index_scanner;
mod kv_file_reader;
mod kv_file_writer;
mod postpone_file_writer;
mod read_builder;
pub(crate) mod rest_env;
pub(crate) mod row_id_predicate;
Expand Down
Loading
Loading