Skip to content
140 changes: 128 additions & 12 deletions crates/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ impl PyDataFrame {
Ok(Self::new(df))
}

/// Apply window function expressions to the DataFrame
#[pyo3(signature = (*exprs))]
fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
let df = self.df.as_ref().clone().window(window_exprs)?;
Ok(Self::new(df))
}

fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
let df = self.df.as_ref().clone().filter(predicate.into())?;
Ok(Self::new(df))
Expand Down Expand Up @@ -804,9 +812,30 @@ impl PyDataFrame {
}

/// Print the query plan
#[pyo3(signature = (verbose=false, analyze=false))]
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyDataFusionResult<()> {
let df = self.df.as_ref().clone().explain(verbose, analyze)?;
#[pyo3(signature = (verbose=false, analyze=false, format=None))]
fn explain(
&self,
py: Python,
verbose: bool,
analyze: bool,
format: Option<&str>,
) -> PyDataFusionResult<()> {
let explain_format = match format {
Some(f) => f
.parse::<datafusion::common::format::ExplainFormat>()
.map_err(|_| {
PyDataFusionError::Common(format!(
"Invalid explain format: '{}'. Valid options: indent, tree, pgjson, graphviz",
f
))
})?,
None => datafusion::common::format::ExplainFormat::Indent,
};
let opts = datafusion::logical_expr::ExplainOption::default()
.with_verbose(verbose)
.with_analyze(analyze)
.with_format(explain_format);
let df = self.df.as_ref().clone().explain_with_options(opts)?;
print_dataframe(py, df)
}

Expand Down Expand Up @@ -875,11 +904,14 @@ impl PyDataFrame {
Ok(Self::new(new_df))
}

#[pyo3(signature = (column, preserve_nulls=true))]
fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult<Self> {
// TODO: expose RecursionUnnestOptions
// REF: https://github.com/apache/datafusion/pull/11577
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
#[pyo3(signature = (column, preserve_nulls=true, recursions=None))]
fn unnest_column(
&self,
column: &str,
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> PyDataFusionResult<Self> {
let unnest_options = build_unnest_options(preserve_nulls, recursions);
let df = self
.df
.as_ref()
Expand All @@ -888,15 +920,14 @@ impl PyDataFrame {
Ok(Self::new(df))
}

#[pyo3(signature = (columns, preserve_nulls=true))]
#[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
fn unnest_columns(
&self,
columns: Vec<String>,
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> PyDataFusionResult<Self> {
// TODO: expose RecursionUnnestOptions
// REF: https://github.com/apache/datafusion/pull/11577
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
let unnest_options = build_unnest_options(preserve_nulls, recursions);
let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
let df = self
.df
Expand All @@ -922,6 +953,71 @@ impl PyDataFrame {
Ok(Self::new(new_df))
}

/// Calculate the set difference with deduplication
fn except_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.except_distinct(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}

/// Calculate the intersection with deduplication
fn intersect_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.intersect_distinct(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}

/// Union two DataFrames matching columns by name
fn union_by_name(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.union_by_name(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}

/// Union two DataFrames by name with deduplication
fn union_by_name_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
let new_df = self
.df
.as_ref()
.clone()
.union_by_name_distinct(py_df.df.as_ref().clone())?;
Ok(Self::new(new_df))
}

/// Deduplicate rows based on specific columns, keeping the first row per group
fn distinct_on(
&self,
on_expr: Vec<PyExpr>,
select_expr: Vec<PyExpr>,
sort_expr: Option<Vec<PySortExpr>>,
) -> PyDataFusionResult<Self> {
let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
let sort_expr = sort_expr.map(to_sort_expressions);
let df = self
.df
.as_ref()
.clone()
.distinct_on(on_expr, select_expr, sort_expr)?;
Ok(Self::new(df))
}

/// Sort by column expressions with ascending order and nulls last
fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
let exprs = exprs.into_iter().map(|e| e.into()).collect();
let df = self.df.as_ref().clone().sort_by(exprs)?;
Ok(Self::new(df))
}

/// Write a `DataFrame` to a CSV file.
fn write_csv(
&self,
Expand Down Expand Up @@ -1295,6 +1391,26 @@ impl PyDataFrameWriteOptions {
}
}

fn build_unnest_options(
preserve_nulls: bool,
recursions: Option<Vec<(String, String, usize)>>,
) -> UnnestOptions {
let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
if let Some(recs) = recursions {
opts.recursions = recs
.into_iter()
.map(
|(input, output, depth)| datafusion::common::RecursionUnnestOption {
input_column: datafusion::common::Column::from(input.as_str()),
output_column: datafusion::common::Column::from(output.as_str()),
depth,
},
)
.collect();
}
opts
}

/// Print DataFrame
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
// Get string representation of record batches
Expand Down
2 changes: 2 additions & 0 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from .dataframe import (
DataFrame,
DataFrameWriteOptions,
ExplainFormat,
InsertOp,
ParquetColumnOptions,
ParquetWriterOptions,
Expand Down Expand Up @@ -82,6 +83,7 @@
"DataFrameWriteOptions",
"Database",
"ExecutionPlan",
"ExplainFormat",
"Expr",
"InsertOp",
"LogicalPlan",
Expand Down
Loading
Loading