hoomd_utility/
data.rs

1// Copyright (c) 2024-2026 The Regents of the University of Michigan.
2// Part of hoomd-rs, released under the BSD 3-Clause License.
3
4//! Data logging methods.
5
6use parquet::{
7    file::{properties::WriterProperties, writer::SerializedFileWriter},
8    record::RecordWriter,
9};
10use std::{fs::File, io, path::Path, sync::Arc};
11use thiserror::Error;
12
13/// The default maximum buffer size.
14const DEFAULT_MAXIMUM_BUFFER_SIZE: usize = 2_usize.pow(17);
15
16/// Enumerate possible sources of error when writing log files.
17#[non_exhaustive]
18#[derive(Error, Debug)]
19pub enum Error {
20    /// Encountered an IO error.
21    #[error("I/O error")]
22    IO(#[from] io::Error),
23
24    /// Encountered an IO error.
25    #[error("Parquet error")]
26    Parquet(#[from] parquet::errors::ParquetError),
27}
28
29/// Create a unique file by appending an integer index.
30///
31/// The first file created will be "{path}.1". If that exists,
32/// the function will create "{path}.2" and so on.
33fn create_unique_file<P: AsRef<Path>>(path: P) -> io::Result<File> {
34    let mut index: u32 = 0;
35
36    loop {
37        let numbered_path = path.as_ref().with_added_extension(index.to_string());
38
39        match File::create_new(numbered_path) {
40            Ok(file) => return Ok(file),
41            Err(error) => match error.kind() {
42                io::ErrorKind::AlreadyExists => (),
43                _ => return Err(error),
44            },
45        }
46
47        index += 1;
48    }
49}
50
51/// Write log records to a Parquet data file.
52///
53/// Use `ParquetLogger` to open a parquet file and write one log record at a
54/// time. `ParquetLogger` buffers up to [`maximum_buffer_size`] log records in
55/// memory and then synchronizes the buffer to the file.
56///
57/// Derive `ParquetRecordWriter` on your log record struct, [`create`] a
58/// `ParquetLogger`, and [`log`] records to the file.
59///
60/// [`create`]: Self::create
61/// [`log`]: Self::log
62/// [`maximum_buffer_size`]: Self::maximum_buffer_size
63///
64/// # Example
65/// ```
66/// use hoomd_utility::data::ParquetLogger;
67/// use parquet_derive::ParquetRecordWriter;
68///
69/// #[derive(ParquetRecordWriter)]
70/// pub struct LogRecord {
71///     /// The simulation step.
72///     step: u64,
73///
74///     /// Total system potential energy.
75///     potential_energy: f64,
76/// }
77///
78/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
79/// # use tempfile::tempdir;
80/// # let tmp_dir = tempdir().expect("temp dir should be created");
81/// # let path = tmp_dir.path().join("log.parquet");
82/// // let path = "log.parquet";
83/// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
84/// parquet_logger.log(LogRecord {
85///     step: 0,
86///     potential_energy: 1.0,
87/// })?;
88/// parquet_logger.log(LogRecord {
89///     step: 1,
90///     potential_energy: 2.0,
91/// })?;
92///
93/// # Ok(())
94/// # }
95/// ```
96pub struct ParquetLogger<T>
97where
98    for<'a> &'a [T]: RecordWriter<T>,
99{
100    /// Parquet writer.
101    writer: SerializedFileWriter<File>,
102
103    /// Logged records that have not been written to the file.
104    buffer: Vec<T>,
105
106    /// Buffer at most this many records.
107    maximum_buffer_size: usize,
108}
109
110impl<T> ParquetLogger<T>
111where
112    for<'a> &'a [T]: RecordWriter<T>,
113{
114    /// Create a new parquet file.
115    ///
116    /// `create` *overwrites* any existing file at `path`. The default
117    /// [`maximum_buffer_size`] is $` 2^{17} `$ records.
118    ///
119    /// [`maximum_buffer_size`]: Self::maximum_buffer_size
120    ///
121    /// # Errors
122    ///
123    /// Returns [`Error`] when there is I/O error opening the file
124    /// or `parquet` fails to initialize the file.
125    ///
126    /// [`Error`]: enum@crate::data::Error
127    ///
128    /// # Example
129    /// ```
130    /// use hoomd_utility::data::ParquetLogger;
131    /// use parquet_derive::ParquetRecordWriter;
132    ///
133    /// #[derive(ParquetRecordWriter)]
134    /// pub struct LogRecord {
135    ///     /// The simulation step.
136    ///     step: u64,
137    ///
138    ///     /// Total system potential energy.
139    ///     potential_energy: f64,
140    /// }
141    ///
142    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
143    /// # use tempfile::tempdir;
144    /// # let tmp_dir = tempdir().expect("temp dir should be created");
145    /// # let path = tmp_dir.path().join("log.parquet");
146    /// // let path = "log.parquet";
147    /// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
148    /// # Ok(())
149    /// # }
150    /// ```
151    #[inline]
152    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
153        let buffer = Vec::<T>::new();
154        let schema = buffer.as_slice().schema()?;
155        let props = Arc::new(WriterProperties::builder().build());
156        let log_file = File::create(path)?;
157        let writer = SerializedFileWriter::new(log_file, schema, props)?;
158
159        Ok(Self {
160            writer,
161            buffer,
162            maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
163        })
164    }
165
166    /// Create a unique parquet file.
167    ///
168    /// Parquet files cannot be appended to once written. The accepted solution
169    /// by the Parquet developers is to create many files (`file.parquet.0`,
170    /// `file.parquet.1`, and so on) that you concatenate on read.
171    ///
172    /// `create_unique` facilitates this process by appending `.0`, `.1`, ... `.{N}`.
173    /// to the given `path`. `create_unique` attempts all extensions in order,
174    /// continuing to the next increment each time it finds an existing file.
175    ///
176    /// The default [`maximum_buffer_size`] is $` 2^{17} `$ records.
177    ///
178    /// [`maximum_buffer_size`]: Self::maximum_buffer_size
179    ///
180    /// # Errors
181    ///
182    /// Returns [`Error`] when there is I/O error opening the file
183    /// or `parquet` fails to initialize the file.
184    ///
185    /// [`Error`]: enum@crate::data::Error
186    ///
187    /// # Example
188    /// ```
189    /// use hoomd_utility::data::ParquetLogger;
190    /// use parquet_derive::ParquetRecordWriter;
191    ///
192    /// #[derive(ParquetRecordWriter)]
193    /// pub struct LogRecord {
194    ///     /// The simulation step.
195    ///     step: u64,
196    ///
197    ///     /// Total system potential energy.
198    ///     potential_energy: f64,
199    /// }
200    ///
201    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
202    /// # use tempfile::tempdir;
203    /// # let tmp_dir = tempdir().expect("temp dir should be created");
204    /// # let path = tmp_dir.path().join("log.parquet");
205    /// // let path = "log.parquet";
206    /// let mut parquet_logger = ParquetLogger::<LogRecord>::create_unique(path)?;
207    /// # Ok(())
208    /// # }
209    /// ```
210    #[inline]
211    pub fn create_unique<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
212        let buffer = Vec::<T>::new();
213        let schema = buffer.as_slice().schema()?;
214        let props = Arc::new(WriterProperties::builder().build());
215        let log_file = create_unique_file(path)?;
216        let writer = SerializedFileWriter::new(log_file, schema, props)?;
217
218        Ok(Self {
219            writer,
220            buffer,
221            maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
222        })
223    }
224
225    /// Log a record to the file.
226    ///
227    /// `log` buffers up to [`maximum_buffer_size`] records in memory before
228    /// synchronizing them to the file. Call `log` once for each record you
229    /// produce during a simulation.
230    ///
231    /// [`maximum_buffer_size`]: Self::maximum_buffer_size
232    ///
233    /// # Errors
234    ///
235    /// Returns [`Error`] when there is I/O error writing to the file
236    /// or `parquet` is unable to write the buffer.
237    ///
238    /// [`Error`]: enum@crate::data::Error
239    ///
240    /// # Example
241    /// ```
242    /// use hoomd_utility::data::ParquetLogger;
243    /// use parquet_derive::ParquetRecordWriter;
244    ///
245    /// #[derive(ParquetRecordWriter)]
246    /// pub struct LogRecord {
247    ///     /// The simulation step.
248    ///     step: u64,
249    ///
250    ///     /// Total system potential energy.
251    ///     potential_energy: f64,
252    /// }
253    ///
254    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
255    /// # use tempfile::tempdir;
256    /// # let tmp_dir = tempdir().expect("temp dir should be created");
257    /// # let path = tmp_dir.path().join("log.parquet");
258    /// // let path = "log.parquet";
259    /// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
260    /// parquet_logger.log(LogRecord {
261    ///     step: 0,
262    ///     potential_energy: 1.0,
263    /// })?;
264    /// parquet_logger.log(LogRecord {
265    ///     step: 1,
266    ///     potential_energy: 2.0,
267    /// })?;
268    ///
269    /// # Ok(())
270    /// # }
271    /// ```
272    #[inline]
273    pub fn log(&mut self, record: T) -> Result<(), Error> {
274        self.buffer.push(record);
275
276        if self.buffer.len() >= self.maximum_buffer_size {
277            self.sync()?;
278        }
279
280        Ok(())
281    }
282
283    /// Maximum number of log records to buffer in memory.
284    #[inline]
285    #[must_use]
286    pub fn maximum_buffer_size(&self) -> usize {
287        self.maximum_buffer_size
288    }
289
290    /// Mutable reference to the maximum number of log records to buffer in memory.
291    #[inline]
292    pub fn maximum_buffer_size_mut(&mut self) -> &mut usize {
293        &mut self.maximum_buffer_size
294    }
295
296    /// Synchronize the buffer to the file.
297    ///
298    /// `sync` writes all currently buffered log records to a row group. A single
299    /// parquet file can contain at most $` 2^{15} `$ row groups, so call `sync`
300    /// only at specific points when data must be on disk.
301    ///
302    /// All buffered data is *automatically* synchronized when the logger is
303    /// dropped.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`Error`] when there is I/O error writing to the file
308    /// or `parquet` is unable to write the buffer.
309    ///
310    /// [`Error`]: enum@crate::data::Error
311    #[inline]
312    pub fn sync(&mut self) -> Result<(), Error> {
313        if !self.buffer.is_empty() {
314            let mut row_group = self.writer.next_row_group()?;
315            self.buffer.as_slice().write_to_row_group(&mut row_group)?;
316            row_group.close()?;
317            self.buffer.clear();
318            self.writer.flush()?;
319        }
320        Ok(())
321    }
322}
323
324/// Synchronize the buffer and close the parquet file.
325impl<T> Drop for ParquetLogger<T>
326where
327    for<'a> &'a [T]: RecordWriter<T>,
328{
329    #[inline]
330    fn drop(&mut self) {
331        let _ = self.sync();
332        let _ = self.writer.finish();
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::fs;
339
340    use super::*;
341    use assert2::check;
342    use parquet_derive::ParquetRecordWriter;
343    use tempfile::tempdir;
344
345    #[derive(ParquetRecordWriter)]
346    struct LogRecord {
347        a: f64,
348    }
349
350    #[test]
351    fn test_create_unique() -> anyhow::Result<()> {
352        let tmp_dir = tempdir()?;
353        let path = tmp_dir.path().join("test.parquet");
354        let _ = ParquetLogger::create_unique(path.clone())?;
355
356        check!(fs::exists(path.with_added_extension("0"))?);
357        check!(!fs::exists(path.with_added_extension("1"))?);
358        check!(!fs::exists(path.with_added_extension("2"))?);
359
360        let _ = ParquetLogger::create_unique(path.clone())?;
361
362        check!(fs::exists(path.with_added_extension("0"))?);
363        check!(fs::exists(path.with_added_extension("1"))?);
364        check!(!fs::exists(path.with_added_extension("2"))?);
365
366        let _ = ParquetLogger::create_unique(path.clone())?;
367
368        check!(fs::exists(path.with_added_extension("0"))?);
369        check!(fs::exists(path.with_added_extension("1"))?);
370        check!(fs::exists(path.with_added_extension("2"))?);
371
372        Ok(())
373    }
374}