1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::Mutex;

use polars_core::POOL;

use super::*;

impl LazyFrame {
    pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
        let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;

        let (tx, rx) = channel();
        let token = state.cancel_token();
        POOL.spawn_fifo(move || {
            let result = physical_plan.execute(&mut state);
            tx.send(result).unwrap();
        });

        Ok(InProcessQuery {
            rx: Arc::new(Mutex::new(rx)),
            token,
        })
    }
}

#[derive(Clone)]
pub struct InProcessQuery {
    rx: Arc<Mutex<Receiver<PolarsResult<DataFrame>>>>,
    token: Arc<AtomicBool>,
}

impl InProcessQuery {
    /// Cancel the query at earliest convenience.
    pub fn cancel(&self) {
        self.token.store(true, Ordering::Relaxed)
    }

    /// Fetch the result.

    /// If it is ready, a materialized DataFrame is returned.
    /// If it is not ready it will return `None`.
    pub fn fetch(&self) -> Option<PolarsResult<DataFrame>> {
        let rx = self.rx.lock().unwrap();
        rx.try_recv().ok()
    }

    /// Await the result synchronously.
    pub fn fetch_blocking(&self) -> PolarsResult<DataFrame> {
        let rx = self.rx.lock().unwrap();
        rx.recv().unwrap()
    }
}

impl Drop for InProcessQuery {
    fn drop(&mut self) {
        self.token.store(true, Ordering::Relaxed);
    }
}