hoomd_utility/data.rs
1// Copyright (c) 2024-2026 The Regents of the University of Michigan.
2// Part of hoomd-rs, released under the BSD 3-Clause License.
3
4//! Data logging methods.
5
6use parquet::{
7 file::{properties::WriterProperties, writer::SerializedFileWriter},
8 record::RecordWriter,
9};
10use std::{fs::File, io, path::Path, sync::Arc};
11use thiserror::Error;
12
13/// The default maximum buffer size.
14const DEFAULT_MAXIMUM_BUFFER_SIZE: usize = 2_usize.pow(17);
15
16/// Enumerate possible sources of error when writing log files.
17#[non_exhaustive]
18#[derive(Error, Debug)]
19pub enum Error {
20 /// Encountered an IO error.
21 #[error("I/O error")]
22 IO(#[from] io::Error),
23
24 /// Encountered an IO error.
25 #[error("Parquet error")]
26 Parquet(#[from] parquet::errors::ParquetError),
27}
28
29/// Create a unique file by appending an integer index.
30///
31/// The first file created will be "{path}.1". If that exists,
32/// the function will create "{path}.2" and so on.
33fn create_unique_file<P: AsRef<Path>>(path: P) -> io::Result<File> {
34 let mut index: u32 = 0;
35
36 loop {
37 let numbered_path = path.as_ref().with_added_extension(index.to_string());
38
39 match File::create_new(numbered_path) {
40 Ok(file) => return Ok(file),
41 Err(error) => match error.kind() {
42 io::ErrorKind::AlreadyExists => (),
43 _ => return Err(error),
44 },
45 }
46
47 index += 1;
48 }
49}
50
51/// Write log records to a Parquet data file.
52///
53/// Use `ParquetLogger` to open a parquet file and write one log record at a
54/// time. `ParquetLogger` buffers up to [`maximum_buffer_size`] log records in
55/// memory and then synchronizes the buffer to the file.
56///
57/// Derive `ParquetRecordWriter` on your log record struct, [`create`] a
58/// `ParquetLogger`, and [`log`] records to the file.
59///
60/// [`create`]: Self::create
61/// [`log`]: Self::log
62/// [`maximum_buffer_size`]: Self::maximum_buffer_size
63///
64/// # Example
65/// ```
66/// use hoomd_utility::data::ParquetLogger;
67/// use parquet_derive::ParquetRecordWriter;
68///
69/// #[derive(ParquetRecordWriter)]
70/// pub struct LogRecord {
71/// /// The simulation step.
72/// step: u64,
73///
74/// /// Total system potential energy.
75/// potential_energy: f64,
76/// }
77///
78/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
79/// # use tempfile::tempdir;
80/// # let tmp_dir = tempdir().expect("temp dir should be created");
81/// # let path = tmp_dir.path().join("log.parquet");
82/// // let path = "log.parquet";
83/// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
84/// parquet_logger.log(LogRecord {
85/// step: 0,
86/// potential_energy: 1.0,
87/// })?;
88/// parquet_logger.log(LogRecord {
89/// step: 1,
90/// potential_energy: 2.0,
91/// })?;
92///
93/// # Ok(())
94/// # }
95/// ```
96pub struct ParquetLogger<T>
97where
98 for<'a> &'a [T]: RecordWriter<T>,
99{
100 /// Parquet writer.
101 writer: SerializedFileWriter<File>,
102
103 /// Logged records that have not been written to the file.
104 buffer: Vec<T>,
105
106 /// Buffer at most this many records.
107 maximum_buffer_size: usize,
108}
109
110impl<T> ParquetLogger<T>
111where
112 for<'a> &'a [T]: RecordWriter<T>,
113{
114 /// Create a new parquet file.
115 ///
116 /// `create` *overwrites* any existing file at `path`. The default
117 /// [`maximum_buffer_size`] is $` 2^{17} `$ records.
118 ///
119 /// [`maximum_buffer_size`]: Self::maximum_buffer_size
120 ///
121 /// # Errors
122 ///
123 /// Returns [`Error`] when there is I/O error opening the file
124 /// or `parquet` fails to initialize the file.
125 ///
126 /// [`Error`]: enum@crate::data::Error
127 ///
128 /// # Example
129 /// ```
130 /// use hoomd_utility::data::ParquetLogger;
131 /// use parquet_derive::ParquetRecordWriter;
132 ///
133 /// #[derive(ParquetRecordWriter)]
134 /// pub struct LogRecord {
135 /// /// The simulation step.
136 /// step: u64,
137 ///
138 /// /// Total system potential energy.
139 /// potential_energy: f64,
140 /// }
141 ///
142 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
143 /// # use tempfile::tempdir;
144 /// # let tmp_dir = tempdir().expect("temp dir should be created");
145 /// # let path = tmp_dir.path().join("log.parquet");
146 /// // let path = "log.parquet";
147 /// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
148 /// # Ok(())
149 /// # }
150 /// ```
151 #[inline]
152 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
153 let buffer = Vec::<T>::new();
154 let schema = buffer.as_slice().schema()?;
155 let props = Arc::new(WriterProperties::builder().build());
156 let log_file = File::create(path)?;
157 let writer = SerializedFileWriter::new(log_file, schema, props)?;
158
159 Ok(Self {
160 writer,
161 buffer,
162 maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
163 })
164 }
165
166 /// Create a unique parquet file.
167 ///
168 /// Parquet files cannot be appended to once written. The accepted solution
169 /// by the Parquet developers is to create many files (`file.parquet.0`,
170 /// `file.parquet.1`, and so on) that you concatenate on read.
171 ///
172 /// `create_unique` facilitates this process by appending `.0`, `.1`, ... `.{N}`.
173 /// to the given `path`. `create_unique` attempts all extensions in order,
174 /// continuing to the next increment each time it finds an existing file.
175 ///
176 /// The default [`maximum_buffer_size`] is $` 2^{17} `$ records.
177 ///
178 /// [`maximum_buffer_size`]: Self::maximum_buffer_size
179 ///
180 /// # Errors
181 ///
182 /// Returns [`Error`] when there is I/O error opening the file
183 /// or `parquet` fails to initialize the file.
184 ///
185 /// [`Error`]: enum@crate::data::Error
186 ///
187 /// # Example
188 /// ```
189 /// use hoomd_utility::data::ParquetLogger;
190 /// use parquet_derive::ParquetRecordWriter;
191 ///
192 /// #[derive(ParquetRecordWriter)]
193 /// pub struct LogRecord {
194 /// /// The simulation step.
195 /// step: u64,
196 ///
197 /// /// Total system potential energy.
198 /// potential_energy: f64,
199 /// }
200 ///
201 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
202 /// # use tempfile::tempdir;
203 /// # let tmp_dir = tempdir().expect("temp dir should be created");
204 /// # let path = tmp_dir.path().join("log.parquet");
205 /// // let path = "log.parquet";
206 /// let mut parquet_logger = ParquetLogger::<LogRecord>::create_unique(path)?;
207 /// # Ok(())
208 /// # }
209 /// ```
210 #[inline]
211 pub fn create_unique<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
212 let buffer = Vec::<T>::new();
213 let schema = buffer.as_slice().schema()?;
214 let props = Arc::new(WriterProperties::builder().build());
215 let log_file = create_unique_file(path)?;
216 let writer = SerializedFileWriter::new(log_file, schema, props)?;
217
218 Ok(Self {
219 writer,
220 buffer,
221 maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
222 })
223 }
224
225 /// Log a record to the file.
226 ///
227 /// `log` buffers up to [`maximum_buffer_size`] records in memory before
228 /// synchronizing them to the file. Call `log` once for each record you
229 /// produce during a simulation.
230 ///
231 /// [`maximum_buffer_size`]: Self::maximum_buffer_size
232 ///
233 /// # Errors
234 ///
235 /// Returns [`Error`] when there is I/O error writing to the file
236 /// or `parquet` is unable to write the buffer.
237 ///
238 /// [`Error`]: enum@crate::data::Error
239 ///
240 /// # Example
241 /// ```
242 /// use hoomd_utility::data::ParquetLogger;
243 /// use parquet_derive::ParquetRecordWriter;
244 ///
245 /// #[derive(ParquetRecordWriter)]
246 /// pub struct LogRecord {
247 /// /// The simulation step.
248 /// step: u64,
249 ///
250 /// /// Total system potential energy.
251 /// potential_energy: f64,
252 /// }
253 ///
254 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
255 /// # use tempfile::tempdir;
256 /// # let tmp_dir = tempdir().expect("temp dir should be created");
257 /// # let path = tmp_dir.path().join("log.parquet");
258 /// // let path = "log.parquet";
259 /// let mut parquet_logger = ParquetLogger::<LogRecord>::create(path)?;
260 /// parquet_logger.log(LogRecord {
261 /// step: 0,
262 /// potential_energy: 1.0,
263 /// })?;
264 /// parquet_logger.log(LogRecord {
265 /// step: 1,
266 /// potential_energy: 2.0,
267 /// })?;
268 ///
269 /// # Ok(())
270 /// # }
271 /// ```
272 #[inline]
273 pub fn log(&mut self, record: T) -> Result<(), Error> {
274 self.buffer.push(record);
275
276 if self.buffer.len() >= self.maximum_buffer_size {
277 self.sync()?;
278 }
279
280 Ok(())
281 }
282
283 /// Maximum number of log records to buffer in memory.
284 #[inline]
285 #[must_use]
286 pub fn maximum_buffer_size(&self) -> usize {
287 self.maximum_buffer_size
288 }
289
290 /// Mutable reference to the maximum number of log records to buffer in memory.
291 #[inline]
292 pub fn maximum_buffer_size_mut(&mut self) -> &mut usize {
293 &mut self.maximum_buffer_size
294 }
295
296 /// Synchronize the buffer to the file.
297 ///
298 /// `sync` writes all currently buffered log records to a row group. A single
299 /// parquet file can contain at most $` 2^{15} `$ row groups, so call `sync`
300 /// only at specific points when data must be on disk.
301 ///
302 /// All buffered data is *automatically* synchronized when the logger is
303 /// dropped.
304 ///
305 /// # Errors
306 ///
307 /// Returns [`Error`] when there is I/O error writing to the file
308 /// or `parquet` is unable to write the buffer.
309 ///
310 /// [`Error`]: enum@crate::data::Error
311 #[inline]
312 pub fn sync(&mut self) -> Result<(), Error> {
313 if !self.buffer.is_empty() {
314 let mut row_group = self.writer.next_row_group()?;
315 self.buffer.as_slice().write_to_row_group(&mut row_group)?;
316 row_group.close()?;
317 self.buffer.clear();
318 self.writer.flush()?;
319 }
320 Ok(())
321 }
322}
323
324/// Synchronize the buffer and close the parquet file.
325impl<T> Drop for ParquetLogger<T>
326where
327 for<'a> &'a [T]: RecordWriter<T>,
328{
329 #[inline]
330 fn drop(&mut self) {
331 let _ = self.sync();
332 let _ = self.writer.finish();
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::fs;
339
340 use super::*;
341 use assert2::check;
342 use parquet_derive::ParquetRecordWriter;
343 use tempfile::tempdir;
344
345 #[derive(ParquetRecordWriter)]
346 struct LogRecord {
347 a: f64,
348 }
349
350 #[test]
351 fn test_create_unique() -> anyhow::Result<()> {
352 let tmp_dir = tempdir()?;
353 let path = tmp_dir.path().join("test.parquet");
354 let _ = ParquetLogger::create_unique(path.clone())?;
355
356 check!(fs::exists(path.with_added_extension("0"))?);
357 check!(!fs::exists(path.with_added_extension("1"))?);
358 check!(!fs::exists(path.with_added_extension("2"))?);
359
360 let _ = ParquetLogger::create_unique(path.clone())?;
361
362 check!(fs::exists(path.with_added_extension("0"))?);
363 check!(fs::exists(path.with_added_extension("1"))?);
364 check!(!fs::exists(path.with_added_extension("2"))?);
365
366 let _ = ParquetLogger::create_unique(path.clone())?;
367
368 check!(fs::exists(path.with_added_extension("0"))?);
369 check!(fs::exists(path.with_added_extension("1"))?);
370 check!(fs::exists(path.with_added_extension("2"))?);
371
372 Ok(())
373 }
374}