xref: /drstd/src/std/io/buffered/linewritershim.rs (revision 9670759b785600bf6315e4173e46a602f16add7a)
1 use crate::std::io::{self, BufWriter, IoSlice, Write};
2 use crate::std::sys_common::memchr;
3 
4 /// Private helper struct for implementing the line-buffered writing logic.
5 /// This shim temporarily wraps a BufWriter, and uses its internals to
6 /// implement a line-buffered writer (specifically by using the internal
7 /// methods like write_to_buf and flush_buf). In this way, a more
8 /// efficient abstraction can be created than one that only had access to
9 /// `write` and `flush`, without needlessly duplicating a lot of the
10 /// implementation details of BufWriter. This also allows existing
11 /// `BufWriters` to be temporarily given line-buffering logic; this is what
12 /// enables Stdout to be alternately in line-buffered or block-buffered mode.
13 #[derive(Debug)]
14 pub struct LineWriterShim<'a, W: ?Sized + Write> {
15     buffer: &'a mut BufWriter<W>,
16 }
17 
18 impl<'a, W: ?Sized + Write> LineWriterShim<'a, W> {
new(buffer: &'a mut BufWriter<W>) -> Self19     pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
20         Self { buffer }
21     }
22 
23     /// Get a reference to the inner writer (that is, the writer
24     /// wrapped by the BufWriter).
inner(&self) -> &W25     fn inner(&self) -> &W {
26         self.buffer.get_ref()
27     }
28 
29     /// Get a mutable reference to the inner writer (that is, the writer
30     /// wrapped by the BufWriter). Be careful with this writer, as writes to
31     /// it will bypass the buffer.
inner_mut(&mut self) -> &mut W32     fn inner_mut(&mut self) -> &mut W {
33         self.buffer.get_mut()
34     }
35 
36     /// Get the content currently buffered in self.buffer
buffered(&self) -> &[u8]37     fn buffered(&self) -> &[u8] {
38         self.buffer.buffer()
39     }
40 
41     /// Flush the buffer iff the last byte is a newline (indicating that an
42     /// earlier write only succeeded partially, and we want to retry flushing
43     /// the buffered line before continuing with a subsequent write)
flush_if_completed_line(&mut self) -> io::Result<()>44     fn flush_if_completed_line(&mut self) -> io::Result<()> {
45         match self.buffered().last().copied() {
46             Some(b'\n') => self.buffer.flush_buf(),
47             _ => Ok(()),
48         }
49     }
50 }
51 
52 impl<'a, W: ?Sized + Write> Write for LineWriterShim<'a, W> {
53     /// Write some data into this BufReader with line buffering. This means
54     /// that, if any newlines are present in the data, the data up to the last
55     /// newline is sent directly to the underlying writer, and data after it
56     /// is buffered. Returns the number of bytes written.
57     ///
58     /// This function operates on a "best effort basis"; in keeping with the
59     /// convention of `Write::write`, it makes at most one attempt to write
60     /// new data to the underlying writer. If that write only reports a partial
61     /// success, the remaining data will be buffered.
62     ///
63     /// Because this function attempts to send completed lines to the underlying
64     /// writer, it will also flush the existing buffer if it ends with a
65     /// newline, even if the incoming data does not contain any newlines.
write(&mut self, buf: &[u8]) -> io::Result<usize>66     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
67         let newline_idx = match memchr::memrchr(b'\n', buf) {
68             // If there are no new newlines (that is, if this write is less than
69             // one line), just do a regular buffered write (which may flush if
70             // we exceed the inner buffer's size)
71             None => {
72                 self.flush_if_completed_line()?;
73                 return self.buffer.write(buf);
74             }
75             // Otherwise, arrange for the lines to be written directly to the
76             // inner writer.
77             Some(newline_idx) => newline_idx + 1,
78         };
79 
80         // Flush existing content to prepare for our write. We have to do this
81         // before attempting to write `buf` in order to maintain consistency;
82         // if we add `buf` to the buffer then try to flush it all at once,
83         // we're obligated to return Ok(), which would mean suppressing any
84         // errors that occur during flush.
85         self.buffer.flush_buf()?;
86 
87         // This is what we're going to try to write directly to the inner
88         // writer. The rest will be buffered, if nothing goes wrong.
89         let lines = &buf[..newline_idx];
90 
91         // Write `lines` directly to the inner writer. In keeping with the
92         // `write` convention, make at most one attempt to add new (unbuffered)
93         // data. Because this write doesn't touch the BufWriter state directly,
94         // and the buffer is known to be empty, we don't need to worry about
95         // self.buffer.panicked here.
96         let flushed = self.inner_mut().write(lines)?;
97 
98         // If buffer returns Ok(0), propagate that to the caller without
99         // doing additional buffering; otherwise we're just guaranteeing
100         // an "ErrorKind::WriteZero" later.
101         if flushed == 0 {
102             return Ok(0);
103         }
104 
105         // Now that the write has succeeded, buffer the rest (or as much of
106         // the rest as possible). If there were any unwritten newlines, we
107         // only buffer out to the last unwritten newline that fits in the
108         // buffer; this helps prevent flushing partial lines on subsequent
109         // calls to LineWriterShim::write.
110 
111         // Handle the cases in order of most-common to least-common, under
112         // the presumption that most writes succeed in totality, and that most
113         // writes are smaller than the buffer.
114         // - Is this a partial line (ie, no newlines left in the unwritten tail)
115         // - If not, does the data out to the last unwritten newline fit in
116         //   the buffer?
117         // - If not, scan for the last newline that *does* fit in the buffer
118         let tail = if flushed >= newline_idx {
119             &buf[flushed..]
120         } else if newline_idx - flushed <= self.buffer.capacity() {
121             &buf[flushed..newline_idx]
122         } else {
123             let scan_area = &buf[flushed..];
124             let scan_area = &scan_area[..self.buffer.capacity()];
125             match memchr::memrchr(b'\n', scan_area) {
126                 Some(newline_idx) => &scan_area[..newline_idx + 1],
127                 None => scan_area,
128             }
129         };
130 
131         let buffered = self.buffer.write_to_buf(tail);
132         Ok(flushed + buffered)
133     }
134 
flush(&mut self) -> io::Result<()>135     fn flush(&mut self) -> io::Result<()> {
136         self.buffer.flush()
137     }
138 
139     /// Write some vectored data into this BufReader with line buffering. This
140     /// means that, if any newlines are present in the data, the data up to
141     /// and including the buffer containing the last newline is sent directly
142     /// to the inner writer, and the data after it is buffered. Returns the
143     /// number of bytes written.
144     ///
145     /// This function operates on a "best effort basis"; in keeping with the
146     /// convention of `Write::write`, it makes at most one attempt to write
147     /// new data to the underlying writer.
148     ///
149     /// Because this function attempts to send completed lines to the underlying
150     /// writer, it will also flush the existing buffer if it contains any
151     /// newlines.
152     ///
153     /// Because sorting through an array of `IoSlice` can be a bit convoluted,
154     /// This method differs from write in the following ways:
155     ///
156     /// - It attempts to write the full content of all the buffers up to and
157     ///   including the one containing the last newline. This means that it
158     ///   may attempt to write a partial line, that buffer has data past the
159     ///   newline.
160     /// - If the write only reports partial success, it does not attempt to
161     ///   find the precise location of the written bytes and buffer the rest.
162     ///
163     /// If the underlying vector doesn't support vectored writing, we instead
164     /// simply write the first non-empty buffer with `write`. This way, we
165     /// get the benefits of more granular partial-line handling without losing
166     /// anything in efficiency
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>167     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
168         // If there's no specialized behavior for write_vectored, just use
169         // write. This has the benefit of more granular partial-line handling.
170         if !self.is_write_vectored() {
171             return match bufs.iter().find(|buf| !buf.is_empty()) {
172                 Some(buf) => self.write(buf),
173                 None => Ok(0),
174             };
175         }
176 
177         // Find the buffer containing the last newline
178         let last_newline_buf_idx = bufs
179             .iter()
180             .enumerate()
181             .rev()
182             .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
183 
184         // If there are no new newlines (that is, if this write is less than
185         // one line), just do a regular buffered write
186         let last_newline_buf_idx = match last_newline_buf_idx {
187             // No newlines; just do a normal buffered write
188             None => {
189                 self.flush_if_completed_line()?;
190                 return self.buffer.write_vectored(bufs);
191             }
192             Some(i) => i,
193         };
194 
195         // Flush existing content to prepare for our write
196         self.buffer.flush_buf()?;
197 
198         // This is what we're going to try to write directly to the inner
199         // writer. The rest will be buffered, if nothing goes wrong.
200         let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
201 
202         // Write `lines` directly to the inner writer. In keeping with the
203         // `write` convention, make at most one attempt to add new (unbuffered)
204         // data. Because this write doesn't touch the BufWriter state directly,
205         // and the buffer is known to be empty, we don't need to worry about
206         // self.panicked here.
207         let flushed = self.inner_mut().write_vectored(lines)?;
208 
209         // If inner returns Ok(0), propagate that to the caller without
210         // doing additional buffering; otherwise we're just guaranteeing
211         // an "ErrorKind::WriteZero" later.
212         if flushed == 0 {
213             return Ok(0);
214         }
215 
216         // Don't try to reconstruct the exact amount written; just bail
217         // in the event of a partial write
218         let lines_len = lines.iter().map(|buf| buf.len()).sum();
219         if flushed < lines_len {
220             return Ok(flushed);
221         }
222 
223         // Now that the write has succeeded, buffer the rest (or as much of the
224         // rest as possible)
225         let buffered: usize = tail
226             .iter()
227             .filter(|buf| !buf.is_empty())
228             .map(|buf| self.buffer.write_to_buf(buf))
229             .take_while(|&n| n > 0)
230             .sum();
231 
232         Ok(flushed + buffered)
233     }
234 
is_write_vectored(&self) -> bool235     fn is_write_vectored(&self) -> bool {
236         self.inner().is_write_vectored()
237     }
238 
239     /// Write some data into this BufReader with line buffering. This means
240     /// that, if any newlines are present in the data, the data up to the last
241     /// newline is sent directly to the underlying writer, and data after it
242     /// is buffered.
243     ///
244     /// Because this function attempts to send completed lines to the underlying
245     /// writer, it will also flush the existing buffer if it contains any
246     /// newlines, even if the incoming data does not contain any newlines.
write_all(&mut self, buf: &[u8]) -> io::Result<()>247     fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
248         match memchr::memrchr(b'\n', buf) {
249             // If there are no new newlines (that is, if this write is less than
250             // one line), just do a regular buffered write (which may flush if
251             // we exceed the inner buffer's size)
252             None => {
253                 self.flush_if_completed_line()?;
254                 self.buffer.write_all(buf)
255             }
256             Some(newline_idx) => {
257                 let (lines, tail) = buf.split_at(newline_idx + 1);
258 
259                 if self.buffered().is_empty() {
260                     self.inner_mut().write_all(lines)?;
261                 } else {
262                     // If there is any buffered data, we add the incoming lines
263                     // to that buffer before flushing, which saves us at least
264                     // one write call. We can't really do this with `write`,
265                     // since we can't do this *and* not suppress errors *and*
266                     // report a consistent state to the caller in a return
267                     // value, but here in write_all it's fine.
268                     self.buffer.write_all(lines)?;
269                     self.buffer.flush_buf()?;
270                 }
271 
272                 self.buffer.write_all(tail)
273             }
274         }
275     }
276 }
277