Evaluate the query and call a user-defined function for every ready batch
Description
This allows streaming results that are larger than RAM in certain cases.
Note that this method is much slower than native sinks. Only use it if you cannot implement your logic otherwise.
<lazyframe>$sink_batches() is a
shortcut for
<lazyframe>$lazy_sink_batches()$collect().
Usage
<LazyFrame>$sink_batches(
lambda,
...,
chunk_size = NULL,
maintain_order = TRUE,
engine = c("auto", "in-memory", "streaming"),
optimizations = pl\$QueryOptFlags()
)
lazyframe__lazy_sink_batches(
lambda,
...,
chunk_size = NULL,
maintain_order = TRUE
)
Arguments
Value
-
\returns$sink_batches() NULLinvisibly. -
\returns a new LazyFrame.$lazy_sink_batches()
Examples
library("polars")
lf <- as_polars_lf(mtcars)
# Each batch is a Polars DataFrame
lf$sink_batches(\(df) print(df), chunk_size = 10)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 16.4 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 17.3 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 15.2 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 10.4 ┆ 8.0 ┆ 472.0 ┆ 205.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 10.4 ┆ 8.0 ┆ 460.0 ┆ 215.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 14.7 ┆ 8.0 ┆ 440.0 ┆ 230.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 32.4 ┆ 4.0 ┆ 78.7 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 30.4 ┆ 4.0 ┆ 75.7 ┆ 52.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 33.9 ┆ 4.0 ┆ 71.1 ┆ 65.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 15.5 ┆ 8.0 ┆ 318.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 15.2 ┆ 8.0 ┆ 304.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 13.3 ┆ 8.0 ┆ 350.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 19.2 ┆ 8.0 ┆ 400.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 27.3 ┆ 4.0 ┆ 79.0 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 30.4 ┆ 4.0 ┆ 95.1 ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 15.8 ┆ 8.0 ┆ 351.0 ┆ 264.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 4.0 │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 6.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (2, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 15.0 ┆ 8.0 ┆ 301.0 ┆ 335.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 8.0 │
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# We can stop reading the batches by returning `TRUE`:
lf$sort("cyl")$sink_batches(
\(df) {
print(df)
# We want to stop if this condition is respected:
max(df[["cyl"]])$to_r_vector() > 4
},
chunk_size = 10
)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 32.4 ┆ 4.0 ┆ 78.7 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 30.4 ┆ 4.0 ┆ 75.7 ┆ 52.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 33.9 ┆ 4.0 ┆ 71.1 ┆ 65.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 27.3 ┆ 4.0 ┆ 79.0 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 30.4 ┆ 4.0 ┆ 95.1 ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 6.0 │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# One usecase for this function is to export larger-than-RAM data to file
# formats for which polars doesn't provide a writer out of the box.
# The example below writes a LazyFrame by batches to a CSV file for the
# sake of the example, but one could replace `write.csv()` by
# `haven::write_dta()`, `saveRDS()`, or other functions.
#
# Note that if `chunk_size` is missing, then Polars tries to compute it
# automatically. However, depending on the characteristics of the data (for
# instance very long string elements), this can lead to out-of-memory errors.
# It is therefore recommended to set `chunk_size` manually.
withr::with_tempdir({
file_idx <- 1
lf$sink_batches(
\(df) {
dest <- paste0("file_", file_idx, ".csv")
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
write.csv(as.data.frame(df), dest)
file_idx <<- file_idx + 1
}
)
cat("\nFiles in the directory:\n")
cat(list.files("."))
cat("\n\n")
pl$read_csv(".")
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#>
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv
#> shape: (32, 12)
#> ┌─────┬──────┬─────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ ┆ mpg ┆ cyl ┆ disp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ i64 ┆ f64 ┆ i64 ┆ f64 ┆ ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
#> ╞═════╪══════╪═════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 1 ┆ 21.0 ┆ 6 ┆ 160.0 ┆ … ┆ 0 ┆ 1 ┆ 4 ┆ 4 │
#> │ 2 ┆ 21.0 ┆ 6 ┆ 160.0 ┆ … ┆ 0 ┆ 1 ┆ 4 ┆ 4 │
#> │ 3 ┆ 22.8 ┆ 4 ┆ 108.0 ┆ … ┆ 1 ┆ 1 ┆ 4 ┆ 1 │
#> │ 4 ┆ 21.4 ┆ 6 ┆ 258.0 ┆ … ┆ 1 ┆ 0 ┆ 3 ┆ 1 │
#> │ 5 ┆ 18.7 ┆ 8 ┆ 360.0 ┆ … ┆ 0 ┆ 0 ┆ 3 ┆ 2 │
#> │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
#> │ 4 ┆ 30.4 ┆ 4 ┆ 95.1 ┆ … ┆ 1 ┆ 1 ┆ 5 ┆ 2 │
#> │ 5 ┆ 15.8 ┆ 8 ┆ 351.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 4 │
#> │ 6 ┆ 19.7 ┆ 6 ┆ 145.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 6 │
#> │ 7 ┆ 15.0 ┆ 8 ┆ 301.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 8 │
#> │ 8 ┆ 21.4 ┆ 4 ┆ 121.0 ┆ … ┆ 1 ┆ 1 ┆ 4 ┆ 2 │
#> └─────┴──────┴─────┴───────┴───┴─────┴─────┴──────┴──────┘
# The number of rows in each chunk can be adjusted with `chunk_size`.
withr::with_tempdir({
file_idx <- 1
lf$sink_batches(
\(df) {
dest <- paste0("file_", file_idx, ".csv")
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
write.csv(as.data.frame(df), dest)
file_idx <<- file_idx + 1
}
)
cat("\nFiles in the directory:\n")
cat(list.files("."))
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#>
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv
# To avoid manually creating paths and incrementing `file_idx` in the
# anonymous function, we can use function factories:
withr::with_tempdir({
writer_factory <- function(dir) {
i <- 0
function(df) {
i <<- i + 1
dest <- file.path(dir, sprintf("%03d.csv", i))
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
as.data.frame(df) |>
write.csv(dest, row.names = FALSE)
}
}
writer <- writer_factory(".")
lf$sink_batches(writer, chunk_size = 10)
cat("\nFiles in the directory:\n")
cat(list.files("."))
})
#> Writing 10 rows to ./001.csv
#> Writing 10 rows to ./002.csv
#> Writing 10 rows to ./003.csv
#> Writing 2 rows to ./004.csv
#>
#> Files in the directory:
#> 001.csv 002.csv 003.csv 004.csv