Evaluate the query and call a user-defined function for every ready batch
Description
This allows streaming results that are larger than RAM in certain cases.
Note that this method is much slower than native sinks. Only use it if you cannot implement your logic otherwise.
<lazyframe>$sink_batches()
is a
shortcut for
<lazyframe>$lazy_sink_batches()$collect()
.
Usage
<LazyFrame>$sink_batches(
lambda,
...,
chunk_size = NULL,
maintain_order = TRUE,
engine = c("auto", "in-memory", "streaming")
)
lazyframe__lazy_sink_batches(
lambda,
...,
chunk_size = NULL,
maintain_order = TRUE
)
Arguments
lambda
|
A function that will receive a DataFrame as the first argument and
called for side effects (e.g., writing to a file). If the function
returns TRUE and using the streaming engine, this signals
that no more results are needed, allowing for early stopping.
|
…
|
These dots are for future extensions and must be empty. |
chunk_size
|
A positive integer or NULL (default). The number of rows
that are buffered before the callback is called.
|
maintain_order
|
Maintain the order in which data is processed. Setting this to
FALSE will be slightly faster.
|
engine
|
The engine name to use for processing the query. One of the followings: |
Value
-
\
returns$sink_batches() NULL
invisibly. -
\
returns a new LazyFrame.$lazy_sink_batches()
Examples
library("polars")
lf <- as_polars_lf(mtcars)
# Each batch is a Polars DataFrame
lf$sink_batches(\(df) print(df), chunk_size = 10)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 16.4 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 17.3 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 15.2 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 3.0 │
#> │ 10.4 ┆ 8.0 ┆ 472.0 ┆ 205.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 10.4 ┆ 8.0 ┆ 460.0 ┆ 215.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 14.7 ┆ 8.0 ┆ 440.0 ┆ 230.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 32.4 ┆ 4.0 ┆ 78.7 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 30.4 ┆ 4.0 ┆ 75.7 ┆ 52.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 33.9 ┆ 4.0 ┆ 71.1 ┆ 65.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 15.5 ┆ 8.0 ┆ 318.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 15.2 ┆ 8.0 ┆ 304.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 13.3 ┆ 8.0 ┆ 350.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> │ 19.2 ┆ 8.0 ┆ 400.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 27.3 ┆ 4.0 ┆ 79.0 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 30.4 ┆ 4.0 ┆ 95.1 ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 15.8 ┆ 8.0 ┆ 351.0 ┆ 264.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 4.0 │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 6.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (2, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 15.0 ┆ 8.0 ┆ 301.0 ┆ 335.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 8.0 │
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# We can stop reading the batches by returning `TRUE`:
lf$sort("cyl")$sink_batches(
\(df) {
print(df)
# We want to stop if this condition is respected:
max(df[["cyl"]])$to_r_vector() > 4
},
chunk_size = 10
)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 2.0 │
#> │ 32.4 ┆ 4.0 ┆ 78.7 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 30.4 ┆ 4.0 ┆ 75.7 ┆ 52.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 33.9 ┆ 4.0 ┆ 71.1 ┆ 65.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 27.3 ┆ 4.0 ┆ 79.0 ┆ 66.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 1.0 │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> │ 30.4 ┆ 4.0 ┆ 95.1 ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0 ┆ 2.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg ┆ cyl ┆ disp ┆ hp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0 ┆ 2.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0 ┆ 4.0 │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0 ┆ 1.0 │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0 ┆ 4.0 │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0 ┆ 6.0 │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 2.0 │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0 ┆ 4.0 │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# One usecase for this function is to export larger-than-RAM data to file
# formats for which polars doesn't provide a writer out of the box.
# The example below writes a LazyFrame by batches to a CSV file for the
# sake of the example, but one could replace `write.csv()` by
# `haven::write_dta()`, `saveRDS()`, or other functions.
#
# Note that if `chunk_size` is missing, then Polars tries to compute it
# automatically. However, depending on the characteristics of the data (for
# instance very long string elements), this can lead to out-of-memory errors.
# It is therefore recommended to set `chunk_size` manually.
withr::with_tempdir({
file_idx <- 1
lf$sink_batches(
\(df) {
dest <- paste0("file_", file_idx, ".csv")
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
write.csv(as.data.frame(df), dest)
file_idx <<- file_idx + 1
}
)
cat("\nFiles in the directory:\n")
cat(list.files("."))
cat("\n\n")
pl$read_csv(".")
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#>
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv
#> shape: (32, 12)
#> ┌─────┬──────┬─────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ ┆ mpg ┆ cyl ┆ disp ┆ … ┆ vs ┆ am ┆ gear ┆ carb │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ i64 ┆ f64 ┆ i64 ┆ f64 ┆ ┆ i64 ┆ i64 ┆ i64 ┆ i64 │
#> ╞═════╪══════╪═════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 1 ┆ 21.0 ┆ 6 ┆ 160.0 ┆ … ┆ 0 ┆ 1 ┆ 4 ┆ 4 │
#> │ 2 ┆ 21.0 ┆ 6 ┆ 160.0 ┆ … ┆ 0 ┆ 1 ┆ 4 ┆ 4 │
#> │ 3 ┆ 22.8 ┆ 4 ┆ 108.0 ┆ … ┆ 1 ┆ 1 ┆ 4 ┆ 1 │
#> │ 4 ┆ 21.4 ┆ 6 ┆ 258.0 ┆ … ┆ 1 ┆ 0 ┆ 3 ┆ 1 │
#> │ 5 ┆ 18.7 ┆ 8 ┆ 360.0 ┆ … ┆ 0 ┆ 0 ┆ 3 ┆ 2 │
#> │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │
#> │ 4 ┆ 30.4 ┆ 4 ┆ 95.1 ┆ … ┆ 1 ┆ 1 ┆ 5 ┆ 2 │
#> │ 5 ┆ 15.8 ┆ 8 ┆ 351.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 4 │
#> │ 6 ┆ 19.7 ┆ 6 ┆ 145.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 6 │
#> │ 7 ┆ 15.0 ┆ 8 ┆ 301.0 ┆ … ┆ 0 ┆ 1 ┆ 5 ┆ 8 │
#> │ 8 ┆ 21.4 ┆ 4 ┆ 121.0 ┆ … ┆ 1 ┆ 1 ┆ 4 ┆ 2 │
#> └─────┴──────┴─────┴───────┴───┴─────┴─────┴──────┴──────┘
# The number of rows in each chunk can be adjusted with `chunk_size`.
withr::with_tempdir({
file_idx <- 1
lf$sink_batches(
\(df) {
dest <- paste0("file_", file_idx, ".csv")
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
write.csv(as.data.frame(df), dest)
file_idx <<- file_idx + 1
}
)
cat("\nFiles in the directory:\n")
cat(list.files("."))
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#>
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv
# To avoid manually creating paths and incrementing `file_idx` in the
# anonymous function, we can use function factories:
withr::with_tempdir({
writer_factory <- function(dir) {
i <- 0
function(df) {
i <<- i + 1
dest <- file.path(dir, sprintf("%03d.csv", i))
cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
as.data.frame(df) |>
write.csv(dest, row.names = FALSE)
}
}
writer <- writer_factory(".")
lf$sink_batches(writer, chunk_size = 10)
cat("\nFiles in the directory:\n")
cat(list.files("."))
})
#> Writing 10 rows to ./001.csv
#> Writing 10 rows to ./002.csv
#> Writing 10 rows to ./003.csv
#> Writing 2 rows to ./004.csv
#>
#> Files in the directory:
#> 001.csv 002.csv 003.csv 004.csv