use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_io::csv::read::{infer_file_schema, CommentPrefix, CsvEncoding, NullValues};
use polars_io::utils::get_reader_bytes;
use polars_io::RowIndex;
use crate::prelude::*;
#[derive(Clone)]
#[cfg(feature = "csv")]
pub struct LazyCsvReader {
path: PathBuf,
paths: Arc<[PathBuf]>,
separator: u8,
skip_rows: usize,
n_rows: Option<usize>,
schema: Option<SchemaRef>,
schema_overwrite: Option<SchemaRef>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
rechunk: bool,
skip_rows_after_header: usize,
encoding: CsvEncoding,
row_index: Option<RowIndex>,
n_threads: Option<usize>,
cache: bool,
has_header: bool,
ignore_errors: bool,
low_memory: bool,
missing_is_null: bool,
truncate_ragged_lines: bool,
decimal_comma: bool,
try_parse_dates: bool,
raise_if_empty: bool,
glob: bool,
}
#[cfg(feature = "csv")]
impl LazyCsvReader {
pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
Self::new("").with_paths(paths)
}
pub fn new(path: impl AsRef<Path>) -> Self {
LazyCsvReader {
path: path.as_ref().to_owned(),
paths: Arc::new([]),
separator: b',',
has_header: true,
ignore_errors: false,
skip_rows: 0,
n_rows: None,
cache: true,
schema: None,
schema_overwrite: None,
low_memory: false,
comment_prefix: None,
quote_char: Some(b'"'),
eol_char: b'\n',
null_values: None,
missing_is_null: true,
infer_schema_length: Some(100),
rechunk: false,
skip_rows_after_header: 0,
encoding: CsvEncoding::Utf8,
row_index: None,
try_parse_dates: false,
raise_if_empty: true,
truncate_ragged_lines: false,
n_threads: None,
decimal_comma: false,
glob: true,
}
}
#[must_use]
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
self.skip_rows_after_header = offset;
self
}
#[must_use]
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}
#[must_use]
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
#[must_use]
pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
self.infer_schema_length = num_rows;
self
}
#[must_use]
pub fn with_ignore_errors(mut self, ignore: bool) -> Self {
self.ignore_errors = ignore;
self
}
#[must_use]
pub fn with_schema(mut self, schema: Option<SchemaRef>) -> Self {
self.schema = schema;
self
}
#[must_use]
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
self
}
#[must_use]
pub fn with_dtype_overwrite(mut self, schema: Option<SchemaRef>) -> Self {
self.schema_overwrite = schema;
self
}
#[must_use]
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
#[must_use]
pub fn with_separator(mut self, separator: u8) -> Self {
self.separator = separator;
self
}
#[must_use]
pub fn with_comment_prefix(mut self, comment_prefix: Option<&str>) -> Self {
self.comment_prefix = comment_prefix.map(|s| {
if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
CommentPrefix::Single(s.as_bytes()[0])
} else {
CommentPrefix::Multi(Arc::from(s))
}
});
self
}
#[must_use]
pub fn with_quote_char(mut self, quote: Option<u8>) -> Self {
self.quote_char = quote;
self
}
#[must_use]
pub fn with_eol_char(mut self, eol_char: u8) -> Self {
self.eol_char = eol_char;
self
}
#[must_use]
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
self.null_values = null_values;
self
}
pub fn with_missing_is_null(mut self, missing_is_null: bool) -> Self {
self.missing_is_null = missing_is_null;
self
}
#[must_use]
pub fn with_cache(mut self, cache: bool) -> Self {
self.cache = cache;
self
}
#[must_use]
pub fn with_low_memory(mut self, toggle: bool) -> Self {
self.low_memory = toggle;
self
}
#[must_use]
pub fn with_encoding(mut self, enc: CsvEncoding) -> Self {
self.encoding = enc;
self
}
#[cfg(feature = "temporal")]
pub fn with_try_parse_dates(mut self, toggle: bool) -> Self {
self.try_parse_dates = toggle;
self
}
#[must_use]
pub fn with_raise_if_empty(mut self, toggle: bool) -> Self {
self.raise_if_empty = toggle;
self
}
#[must_use]
pub fn with_truncate_ragged_lines(mut self, toggle: bool) -> Self {
self.truncate_ragged_lines = toggle;
self
}
#[must_use]
pub fn with_decimal_comma(mut self, toggle: bool) -> Self {
self.decimal_comma = toggle;
self
}
#[must_use]
pub fn with_glob(mut self, toggle: bool) -> Self {
self.glob = toggle;
self
}
pub fn with_schema_modify<F>(mut self, f: F) -> PolarsResult<Self>
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let mut file = if let Some(mut paths) = self.iter_paths()? {
let path = match paths.next() {
Some(globresult) => globresult?,
None => polars_bail!(ComputeError: "globbing pattern did not match any files"),
};
polars_utils::open_file(path)
} else {
polars_utils::open_file(&self.path)
}?;
let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");
let mut skip_rows = self.skip_rows;
let (schema, _, _) = infer_file_schema(
&reader_bytes,
self.separator,
self.infer_schema_length,
self.has_header,
None,
&mut skip_rows,
self.skip_rows_after_header,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
None,
self.try_parse_dates,
self.raise_if_empty,
&mut self.n_threads,
self.decimal_comma,
)?;
let mut schema = f(schema)?;
if let Some(overwrite_schema) = &self.schema_overwrite {
for (name, dtype) in overwrite_schema.iter() {
schema.with_column(name.clone(), dtype.clone());
}
}
Ok(self.with_schema(Some(Arc::new(schema))))
}
}
impl LazyFileListReader for LazyCsvReader {
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let mut lf: LazyFrame = DslBuilder::scan_csv(
self.path,
self.separator,
self.has_header,
self.ignore_errors,
self.skip_rows,
self.n_rows,
self.cache,
self.schema,
self.schema_overwrite,
self.low_memory,
self.comment_prefix,
self.quote_char,
self.eol_char,
self.null_values,
self.infer_schema_length,
self.rechunk,
self.skip_rows_after_header,
self.encoding,
self.row_index,
self.try_parse_dates,
self.raise_if_empty,
self.truncate_ragged_lines,
self.n_threads,
self.decimal_comma,
)?
.build()
.into();
lf.opt_state.file_caching = true;
Ok(lf)
}
fn glob(&self) -> bool {
self.glob
}
fn path(&self) -> &Path {
&self.path
}
fn paths(&self) -> &[PathBuf] {
&self.paths
}
fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}
fn with_paths(mut self, paths: Arc<[PathBuf]>) -> Self {
self.paths = paths;
self
}
fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.n_rows = n_rows.into();
self
}
fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}
fn rechunk(&self) -> bool {
self.rechunk
}
#[must_use]
fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}
fn n_rows(&self) -> Option<usize> {
self.n_rows
}
fn row_index(&self) -> Option<&RowIndex> {
self.row_index.as_ref()
}
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
let args = UnionArgs {
rechunk: self.rechunk(),
parallel: false,
to_supertypes: false,
from_partitioned_ds: true,
..Default::default()
};
concat_impl(&lfs, args)
}
}