Skip to content

Evaluate the query and call a user-defined function for every ready batch

Source code

Description

[Experimental] This allows streaming results that are larger than RAM in certain cases.

Note that this method is much slower than native sinks. Only use it if you cannot implement your logic otherwise.

<lazyframe>$sink_batches() is a shortcut for <lazyframe>$lazy_sink_batches()$collect().

Usage

<LazyFrame>$sink_batches(
  lambda,
  ...,
  chunk_size = NULL,
  maintain_order = TRUE,
  engine = c("auto", "in-memory", "streaming")
)

lazyframe__lazy_sink_batches(
  lambda,
  ...,
  chunk_size = NULL,
  maintain_order = TRUE
)

Arguments

lambda A function that will receive a DataFrame as the first argument and called for side effects (e.g., writing to a file). If the function returns TRUE and using the streaming engine, this signals that no more results are needed, allowing for early stopping.
These dots are for future extensions and must be empty.
chunk_size A positive integer or NULL (default). The number of rows that are buffered before the callback is called.
maintain_order Maintain the order in which data is processed. Setting this to FALSE will be slightly faster.
engine The engine name to use for processing the query. One of the followings:
  • “auto” (default): Select the engine automatically. The “in-memory” engine will be selected for most cases.
  • “in-memory”: Use the in-memory engine.
  • “streaming”: [Experimental] Use the (new) streaming engine.

Value

  • \$sink_batches() returns NULL invisibly.
  • \$lazy_sink_batches() returns a new LazyFrame.

Examples

library("polars")

lf <- as_polars_lf(mtcars)

# Each batch is a Polars DataFrame
lf$sink_batches(\(df) print(df), chunk_size = 10)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0  ┆ 4.0  │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0  ┆ 4.0  │
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 2.0  │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 2.0  │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 2.0  │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 4.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 4.0  │
#> │ 16.4 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 3.0  │
#> │ 17.3 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 3.0  │
#> │ 15.2 ┆ 8.0 ┆ 275.8 ┆ 180.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 3.0  │
#> │ 10.4 ┆ 8.0 ┆ 472.0 ┆ 205.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> │ 10.4 ┆ 8.0 ┆ 460.0 ┆ 215.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> │ 14.7 ┆ 8.0 ┆ 440.0 ┆ 230.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> │ 32.4 ┆ 4.0 ┆ 78.7  ┆ 66.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 30.4 ┆ 4.0 ┆ 75.7  ┆ 52.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> │ 33.9 ┆ 4.0 ┆ 71.1  ┆ 65.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 15.5 ┆ 8.0 ┆ 318.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 2.0  │
#> │ 15.2 ┆ 8.0 ┆ 304.0 ┆ 150.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 2.0  │
#> │ 13.3 ┆ 8.0 ┆ 350.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> │ 19.2 ┆ 8.0 ┆ 400.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 2.0  │
#> │ 27.3 ┆ 4.0 ┆ 79.0  ┆ 66.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0  ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 2.0  │
#> │ 30.4 ┆ 4.0 ┆ 95.1  ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0  ┆ 2.0  │
#> │ 15.8 ┆ 8.0 ┆ 351.0 ┆ 264.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 4.0  │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 6.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (2, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 15.0 ┆ 8.0 ┆ 301.0 ┆ 335.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 8.0  │
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# We can stop reading the batches by returning `TRUE`:
lf$sort("cyl")$sink_batches(
  \(df) {
    print(df)

    # We want to stop if this condition is respected:
    max(df[["cyl"]])$to_r_vector() > 4
  },
  chunk_size = 10
)
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 22.8 ┆ 4.0 ┆ 108.0 ┆ 93.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 24.4 ┆ 4.0 ┆ 146.7 ┆ 62.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 2.0  │
#> │ 22.8 ┆ 4.0 ┆ 140.8 ┆ 95.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 2.0  │
#> │ 32.4 ┆ 4.0 ┆ 78.7  ┆ 66.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 30.4 ┆ 4.0 ┆ 75.7  ┆ 52.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> │ 33.9 ┆ 4.0 ┆ 71.1  ┆ 65.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 21.5 ┆ 4.0 ┆ 120.1 ┆ 97.0  ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 27.3 ┆ 4.0 ┆ 79.0  ┆ 66.0  ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 1.0  │
#> │ 26.0 ┆ 4.0 ┆ 120.3 ┆ 91.0  ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 2.0  │
#> │ 30.4 ┆ 4.0 ┆ 95.1  ┆ 113.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 5.0  ┆ 2.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
#> shape: (10, 11)
#> ┌──────┬─────┬───────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │ mpg  ┆ cyl ┆ disp  ┆ hp    ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ ---  ┆ --- ┆ ---   ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ f64  ┆ f64 ┆ f64   ┆ f64   ┆   ┆ f64 ┆ f64 ┆ f64  ┆ f64  │
#> ╞══════╪═════╪═══════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 21.4 ┆ 4.0 ┆ 121.0 ┆ 109.0 ┆ … ┆ 1.0 ┆ 1.0 ┆ 4.0  ┆ 2.0  │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0  ┆ 4.0  │
#> │ 21.0 ┆ 6.0 ┆ 160.0 ┆ 110.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 4.0  ┆ 4.0  │
#> │ 21.4 ┆ 6.0 ┆ 258.0 ┆ 110.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 18.1 ┆ 6.0 ┆ 225.0 ┆ 105.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 3.0  ┆ 1.0  │
#> │ 19.2 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 4.0  │
#> │ 17.8 ┆ 6.0 ┆ 167.6 ┆ 123.0 ┆ … ┆ 1.0 ┆ 0.0 ┆ 4.0  ┆ 4.0  │
#> │ 19.7 ┆ 6.0 ┆ 145.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 1.0 ┆ 5.0  ┆ 6.0  │
#> │ 18.7 ┆ 8.0 ┆ 360.0 ┆ 175.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 2.0  │
#> │ 14.3 ┆ 8.0 ┆ 360.0 ┆ 245.0 ┆ … ┆ 0.0 ┆ 0.0 ┆ 3.0  ┆ 4.0  │
#> └──────┴─────┴───────┴───────┴───┴─────┴─────┴──────┴──────┘
# One usecase for this function is to export larger-than-RAM data to file
# formats for which polars doesn't provide a writer out of the box.
# The example below writes a LazyFrame by batches to a CSV file for the
# sake of the example, but one could replace `write.csv()` by
# `haven::write_dta()`, `saveRDS()`, or other functions.
#
# Note that if `chunk_size` is missing, then Polars tries to compute it
# automatically. However, depending on the characteristics of the data (for
# instance very long string elements), this can lead to out-of-memory errors.
# It is therefore recommended to set `chunk_size` manually.

withr::with_tempdir({
  file_idx <- 1

  lf$sink_batches(
    \(df) {
      dest <- paste0("file_", file_idx, ".csv")
      cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
      write.csv(as.data.frame(df), dest)
      file_idx <<- file_idx + 1
    }
  )

  cat("\nFiles in the directory:\n")
  cat(list.files("."))
  cat("\n\n")

  pl$read_csv(".")
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#> 
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv

#> shape: (32, 12)
#> ┌─────┬──────┬─────┬───────┬───┬─────┬─────┬──────┬──────┐
#> │     ┆ mpg  ┆ cyl ┆ disp  ┆ … ┆ vs  ┆ am  ┆ gear ┆ carb │
#> │ --- ┆ ---  ┆ --- ┆ ---   ┆   ┆ --- ┆ --- ┆ ---  ┆ ---  │
#> │ i64 ┆ f64  ┆ i64 ┆ f64   ┆   ┆ i64 ┆ i64 ┆ i64  ┆ i64  │
#> ╞═════╪══════╪═════╪═══════╪═══╪═════╪═════╪══════╪══════╡
#> │ 1   ┆ 21.0 ┆ 6   ┆ 160.0 ┆ … ┆ 0   ┆ 1   ┆ 4    ┆ 4    │
#> │ 2   ┆ 21.0 ┆ 6   ┆ 160.0 ┆ … ┆ 0   ┆ 1   ┆ 4    ┆ 4    │
#> │ 3   ┆ 22.8 ┆ 4   ┆ 108.0 ┆ … ┆ 1   ┆ 1   ┆ 4    ┆ 1    │
#> │ 4   ┆ 21.4 ┆ 6   ┆ 258.0 ┆ … ┆ 1   ┆ 0   ┆ 3    ┆ 1    │
#> │ 5   ┆ 18.7 ┆ 8   ┆ 360.0 ┆ … ┆ 0   ┆ 0   ┆ 3    ┆ 2    │
#> │ …   ┆ …    ┆ …   ┆ …     ┆ … ┆ …   ┆ …   ┆ …    ┆ …    │
#> │ 4   ┆ 30.4 ┆ 4   ┆ 95.1  ┆ … ┆ 1   ┆ 1   ┆ 5    ┆ 2    │
#> │ 5   ┆ 15.8 ┆ 8   ┆ 351.0 ┆ … ┆ 0   ┆ 1   ┆ 5    ┆ 4    │
#> │ 6   ┆ 19.7 ┆ 6   ┆ 145.0 ┆ … ┆ 0   ┆ 1   ┆ 5    ┆ 6    │
#> │ 7   ┆ 15.0 ┆ 8   ┆ 301.0 ┆ … ┆ 0   ┆ 1   ┆ 5    ┆ 8    │
#> │ 8   ┆ 21.4 ┆ 4   ┆ 121.0 ┆ … ┆ 1   ┆ 1   ┆ 4    ┆ 2    │
#> └─────┴──────┴─────┴───────┴───┴─────┴─────┴──────┴──────┘
# The number of rows in each chunk can be adjusted with `chunk_size`.
withr::with_tempdir({
  file_idx <- 1

  lf$sink_batches(
    \(df) {
      dest <- paste0("file_", file_idx, ".csv")
      cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
      write.csv(as.data.frame(df), dest)
      file_idx <<- file_idx + 1
    }
  )

  cat("\nFiles in the directory:\n")
  cat(list.files("."))
})
#> Writing 8 rows to file_1.csv
#> Writing 8 rows to file_2.csv
#> Writing 8 rows to file_3.csv
#> Writing 8 rows to file_4.csv
#> 
#> Files in the directory:
#> file_1.csv file_2.csv file_3.csv file_4.csv
# To avoid manually creating paths and incrementing `file_idx` in the
# anonymous function, we can use function factories:
withr::with_tempdir({
  writer_factory <- function(dir) {
    i <- 0
    function(df) {
      i <<- i + 1
      dest <- file.path(dir, sprintf("%03d.csv", i))
      cat(sprintf("Writing %s rows to %s\n", nrow(df), dest))
      as.data.frame(df) |>
        write.csv(dest, row.names = FALSE)
    }
  }

  writer <- writer_factory(".")

  lf$sink_batches(writer, chunk_size = 10)

  cat("\nFiles in the directory:\n")
  cat(list.files("."))
})
#> Writing 10 rows to ./001.csv
#> Writing 10 rows to ./002.csv
#> Writing 10 rows to ./003.csv
#> Writing 2 rows to ./004.csv
#> 
#> Files in the directory:
#> 001.csv 002.csv 003.csv 004.csv