Evaluate the query in streaming mode and write to a Parquet file
Description
This allows streaming results that are larger than RAM to be written to disk.
-
\
don’t write directly to the output file(s) until$lazy_sink\_\*() $collect()
is called. This is useful if you want to save a query to review or run later. -
\
write directly to the output file(s) (they are shortcuts for$sink\_*() \
).$lazy_sink\_*()$collect()
Usage
parquet_statistics(
...,
min = TRUE,
max = TRUE,
distinct_count = TRUE,
null_count = TRUE
)
lazyframe__sink_parquet(
path,
...,
compression = c("lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd"),
compression_level = NULL,
statistics = TRUE,
row_group_size = NULL,
data_page_size = NULL,
maintain_order = TRUE,
type_coercion = TRUE,
`_type_check` = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
storage_options = NULL,
retries = 2,
sync_on_close = c("none", "data", "all"),
mkdir = FALSE,
engine = c("auto", "in-memory", "streaming"),
collapse_joins = deprecated()
)
lazyframe__lazy_sink_parquet(
path,
...,
compression = c("lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd"),
compression_level = NULL,
statistics = TRUE,
row_group_size = NULL,
data_page_size = NULL,
maintain_order = TRUE,
type_coercion = TRUE,
`_type_check` = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
storage_options = NULL,
retries = 2,
sync_on_close = c("none", "data", "all"),
mkdir = FALSE,
collapse_joins = deprecated()
)
Arguments
…
|
These dots are for future extensions and must be empty. |
min
|
Include stats on the minimum values in the column. |
max
|
Include stats on the maximum values in the column. |
distinct_count
|
Include stats on the number of distinct values in the column. |
null_count
|
Include stats on the number of null values in the column. |
path
|
A character. File path to which the file should be written. |
compression
|
The compression method. Must be one of:
|
compression_level
|
NULL or integer. The level of compression to use. Only used
if method is one of “gzip” , “brotli” , or
“zstd” . Higher compression means smaller files on disk:
|
statistics
|
Whether statistics should be written to the Parquet headers. Possible
values:
|
row_group_size
|
Size of the row groups in number of rows. If NULL
(default), the chunks of the DataFrame are used. Writing in smaller
chunks may reduce memory pressure and improve writing speeds.
|
data_page_size
|
Size of the data page in bytes. If NULL (default), it is
set to 1024^2 bytes.
|
maintain_order
|
Maintain the order in which data is processed. Setting this to
FALSE will be slightly faster.
|
type_coercion
|
A logical, indicates type coercion optimization. |
\_type_check
|
For internal use only. |
predicate_pushdown
|
A logical, indicates predicate pushdown optimization. |
projection_pushdown
|
A logical, indicates projection pushdown optimization. |
simplify_expression
|
A logical, indicates simplify expression optimization. |
slice_pushdown
|
A logical, indicates slice pushdown optimization. |
no_optimization
|
A logical. If TRUE , turn off (certain) optimizations.
|
storage_options
|
Named vector containing options that indicate how to connect to a cloud
provider. The cloud providers currently supported are AWS, GCP, and
Azure. See supported keys here:
storage_options is not provided, Polars will try to
infer the information from environment variables.
|
retries
|
Number of retries if accessing a cloud instance fails. |
sync_on_close
|
Sync to disk when before closing a file. Must be one of:
|
mkdir
|
Recursively create all the directories in the path. |
engine
|
The engine name to use for processing the query. One of the followings: |
collapse_joins
|
predicate_pushdown instead.
|
Value
-
\
returns$sink\_\*() NULL
invisibly. -
\
returns a new LazyFrame.$lazy_sink\_\*()
Examples
library("polars")
# Sink table 'mtcars' from mem to parquet
tmpf <- tempfile()
as_polars_lf(mtcars)$sink_parquet(tmpf)
# Create a query that can be run in streaming end-to-end
tmpf2 <- tempfile()
lf <- pl$scan_parquet(tmpf)$select(pl$col("cyl") * 2)$lazy_sink_parquet(tmpf2)
lf$explain() |>
cat()
#> SINK (file)
#> SELECT [[(col("cyl")) * (2.0)]]
#> Parquet SCAN [/tmp/Rtmp61Zwmp/file5c745401dd63]
#> PROJECT 1/11 COLUMNS
#> shape: (0, 0)
#> ┌┐
#> ╞╡
#> └┘
#> shape: (32, 1)
#> ┌──────┐
#> │ cyl │
#> │ --- │
#> │ f64 │
#> ╞══════╡
#> │ 12.0 │
#> │ 12.0 │
#> │ 8.0 │
#> │ 12.0 │
#> │ 16.0 │
#> │ … │
#> │ 8.0 │
#> │ 16.0 │
#> │ 12.0 │
#> │ 16.0 │
#> │ 8.0 │
#> └──────┘