Combine 3 - Partial parsing in Rust
Combine is a parser combinator library for the Rust programming language
I first announced version 3 of Combine back in August and back then I definitely expected to have a stable version by now.
However other projects (cough gluon cough) got in the way and Combine fell to the wayside.
It didn’t help that I didn’t have a killer feature for 3.0 either, user-defined error types make it possible to define parsers usable in #[no_std] crates which is great when you need it but it is still a fairly niche use-case.
Most of the other changes were about cleanup, removing deprecated items and generalizing some types and functions. The range::recognize parser deserves some special mention though. What it does is to take any Combine parser and makes it return a Range of the input, usually a slice (&str or &[u8]). This greatly increases the flexibility of range (zero-copy) parsers making it possible to write a simple float parser like this.
// Since `recognize` ignores the output of the parser it wraps we use `skip_`
// parsers to avoid allocating any containers for the digits
let mut parser = recognize((
    skip_many1(digit()),
    optional((token(b'.'), skip_many(digit()))),
)).and_then(|bs: &[u8]| {
    ::std::str::from_utf8(bs).unwrap().parse::<f64>()
});
parser.easy_parse(&b"123.45"[..]);
parser.easy_parse(&b"123"[..]);
As indicated by the title those aren’t all the changes coming in Combine 3.0. I have one additional big change that I put together over the last few weeks which should be especially useful as asynchronous programs becomes more common in Rust.
Dealing with different forms of input
Often when something needs to be parsed it is not the case that the full input is available at the time that we start parsing.
Perhaps if you only care about parsing files or already have your data’s size specified by another protocol such as HTTP, it is simple to just read the entire input into a Vec or String and parse the entire thing at once.
But other times the size information may either be unavailable (perhaps you are implementing protocol such as HTTP) or maybe the full input is simply to large to be kept in memory all at once.
In either case an easy solution is to parse directly from the io device using Read. That way the input will be consumed byte-by-byte and only as much data as the Read instance needs will be kept in memory at any point in time.
This is where the whole story about partial input would end if it weren’t for one little problem. Any time we read more input the device may not have it available. If that is the case we end up blocking the entire thread while waiting for more input from the device. Though sometimes an acceptable trade-off, that is certainly not the case for asynchronous programs whose entire abstraction hinges on each small task yielding control to other tasks while they wait for data to be available. If this contract is violated and we do block the thread we may block the progress of thousands of other tasks, potentially even causing them to timeout.
If we want to effectively parse in a asynchronous context we need a different solution, one that does not involve blocking for input and is able to parse regardless of how much input is available at any point in time.
Parsing partial input efficiently
Luckily there is a tried and true solution for this problem. By encoding the parser as a finite-state machine we can avoid all of the pitfalls explained above. Using a parser built in this way, we only attempt to parse a small part of the full input in each state. If it succeeds, everything is fine and we remove the input we parsed from the buffer to keep memory usage low and we move to the next state. If it fails because we happened to not have enough input then we simply ask the io device to fill the buffer with more input and then try again once it becomes available. Since we only failed to parse a small part of the full input we didn’t lose much progress either.
To be a bit less abstract, here is a parser for the JSON-RPC messages used in the language server protocol built as a state machine into a tokio_io::codec::Decoder.
To keep things simple this parser decodes the messages but does not do any JSON parsing.
Example message
Content-Length: 18
{ "some": "data" }
#[derive(Debug)]
enum State {
    Nothing,
    ContentLength(usize),
}
#[derive(Debug)]
pub struct LanguageServerDecoder {
    state: State,
}
macro_rules! decode {
    ($parser: expr, $src: expr) => { {
        let (output, removed_len) = {
            let opt = combine_decode($parser, &$src[..])
                .map_err(|err| {
                // Since err contains references into `src` we must replace these
                // before we can return an error or call `split_to`
                err.map_range(|r| {
                    str::from_utf8(r)
                        .ok()
                        .map_or_else(|| format!("{:?}", r), |s| s.to_string())
                }).map_position(|p| p.translate_position(&src[..]))
            })?;
            match opt {
                None => return Ok(None),
                Some(x) => x,
            }
        };
        // Remove the input we just consumed
        $src.split_to(removed_len);
        output
    } };
}
impl Decoder for LanguageServerDecoder {
    type Item = String;
    type Error = Box<::std::error::Error + Send + Sync>;
    fn decode(
        &mut self,
        src: &mut BytesMut
        ) -> Result<Option<Self::Item>, Self::Error>
    {
        loop {
            match self.state {
                // Nothing has been parsed so start looking for the
                // `Content-Length` header
                State::Nothing => {
                    let value = decode!(
                        (
                            skip_many(range(&b"\r\n"[..])),
                            range(&b"Content-Length: "[..]),
                            recognize(skip_many1(digit())),
                            many1(digit()),
                            range(&b"\r\n\r\n"[..]),
                        ).map(|t| t.2)
                            .and_then(|digits: &[u8]|{
                                str::from_utf8(digits)
                                    .unwrap()
                                    .parse::<usize>()
                            }),
                        src
                    );
                    // We now know how long the message is
                    // and we can transition to the next state 
                    self.state = State::ContentLength(value);
                }
                // We know how long the message is so try to extract that much
                // data
                State::ContentLength(message_length) => {
                    let message = decode!(
                        take(message_length).map(|bytes: &[u8]| bytes.to_owned()),
                        src
                    );
                    
                    // We parsed a message!
                    // Return to the starting state and look for the next
                    // `Content-Length` header
                    self.state = State::Nothing;
                    return Ok(Some(String::from_utf8(message)?));
                }
            }
        }
    }
}
As we can see this parser is segmented into two steps, first it extracts the length of the message from the Content-Length header and then it uses that length to extract a range of bytes before returning to the original state again. Since this is very simple example the code ends up pretty easy to follow but imagine if we needed 10 different states to cover our protocol.
impl Decoder for MyDecoder {
    type Item = ...;
    type Error = ...;
    fn decode(
        &mut self,
        src: &mut BytesMut
        ) -> Result<Option<Self::Item>, Self::Error> 
    {
        loop {
            match self.state {
                State::State1(..) => {
                    // ...
                    self.state = State::State2(..);
                }
                State::State2(..) => {
                    // ...
                    self.state = State::State3(..);
                }
                State::State3(..) => {
                    // ...
                    if x {
                        self.state = State::State4(..);
                    } else {
                        self.state = State::State5(..);
                    }
                }
                // ...
                State::StateX(..) => {
                    // ...
                    self.state = State::State1(..);
                }
            }
        }
    }
}
Suddenly the complexity has grown significantly, not only because we have more parsers in play but also because the state transitions may not be as trivial as they were in the first example. With enough care and tests we may make it a robust solution for parsing partial input in a non-blocking fashion but all this tedious and error prone work kept bothering me. Can we not do better than this?
Turns out we can.
Automatically encoding parser combinators into state machines
All parsers written using Combine 3.0 are now encoded as state machines which lets parsing pause when end of input is reached and later resume once more input are supplied from the caller.
By keeping all the state machine details hidden away from the task of defining the parsers we can construct.
So if you don’t need partial parsing you won’t even notice it is there and if you do use it you only need a few extra lines to specify the type of state as well as to store it somewhere between parse calls.
To avoid specifying the actual type of the state (which grows very large in order to handle all the states) the any_partial_state parser wrapper is provided to box and hide the state, but once impl Trait gets stabilized even that bit of boilerplate can (largely) be avoided.
We also need a bit of boilerplate to mark the stream as partial and finally we must actually remove the input range that has been parsed after the parser returns so that we do not try to parse the same input again when resuming the parsing.
Putting it all together, this lets us rewrite the first example into just a single parser which gets called by the small bit of boilerplate.
pub struct LanguageServerDecoder {
    state: AnyPartialState,
}
parser! {
    // To avoid writing out the state type we Box it using `AnyPartialState`
    // and the `any_partial_state` parser combinator
    type PartialState = AnyPartialState;
    // The syntax of the parser macro follows normal function declarations as
    // closely as possible but requires brackets (`[]`) around the type
    // parameters and around the where clause.
    // It also requires extra bit of information between the arguments and
    // the return type which marks what input the parser takes.
    // In this case the input is `I: RangeStream`.
    fn decode_parser['a, I]()(I) -> Vec<u8>
    where [ I: RangeStream<Item = u8, Range = &'a [u8]>, ]
    {
        let content_length =
            range(&b"Content-Length: "[..])
                .with(
                    recognize(skip_many1(digit()))
                        .and_then(|digits: &[u8]|
                            str::from_utf8(digits).unwrap().parse::<usize>()
                                // Convert the error from `.parse`
                                // into an error combine understands
                                .map_err(StreamErrorFor::<I>::other)
                        )
                );
        any_partial_state(
            (
                skip_many(range(&b"\r\n"[..])),
                content_length,
                range(&b"\r\n\r\n"[..]).map(|_| ()),
            )
            .then_partial(|&mut (_, message_length, _)| {
                take(message_length).map(|bytes: &[u8]| bytes.to_owned())
            })
        )
    }
}
impl Decoder for LanguageServerDecoder {
    type Item = String;
    type Error = Box<::std::error::Error + Send + Sync>;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        let (opt, removed_len) = combine::stream::decode(
            message_parser(),
            // easy::Stream gives us nice error messages
            // (the same error messages that combine has had since its inception)
            // PartialStream lets the parsers know that more input should be
            // expected if end of input is unexpectedly reached
            easy::Stream(PartialStream(&src[..])),
            &mut self.state,
        ).map_err(|err| {
            // Since err contains references into `src` we must replace
            // these before we can return an error or call `split_to`
            err.map_range(|r| {
                str::from_utf8(r)
                    .ok()
                    .map_or_else(|| format!("{:?}", r), |s| s.to_string())
            }).map_position(|p| p.translate_position(&src[..]))
        })?;
        // Remove the input we just consumed.
        // Ideally this would be done automatically by the call to
        // `stream::decode` but it does unfortunately not work due
        // to lifetime issues (Non lexical lifetimes might fix it!)
        src.split_to(removed_len);
        Ok(match opt {
            // `None` means we did not have enough input and we require that the
            // caller of `decode` supply more before calling us again
            None => None,
            // `Some` means that a message was successfully decoded
            // (and that we are ready to start decoding the next message)
            Some(output) => Some(String::from_utf8(output)?),
        })
    }
}
Perhaps I am biased but that is nice improvement compared to the first parser. On a more complex parser the difference would only become larger as the boilerplate needed to deal with partial reads is only written once and is completely separated from the actual parsing!
Performance
The good news is that all of these changes has almost no effect on performance when not doing partial parsing! So any parser written without partial parsing in mind will work exactly as before and with comparable performance. On the other hand if you do use partial parsing you will see a bit of overhead as the parsers need to locate where in the parse-tree that parsing should resume. Once found, all parsers that follow will continue parsing with only a little bit of overhead(*).
(* Some caveats apply, I may do a follow up on the exact nature on how Combine’s parsers work now)
There are some bad news however. Unfortunately due to some regressions in rustc, Combine is not quite able to achieve the same performance as Nom. Only if we we throw away all error information in combine(*) can the parser achieve equivalent performance (as measured using the http benchmark parsers for each crate).
NOTE: These parsers all operate on full input.
# Benchmarks are run with `RUSTFLAGS="-C lto=fat -C codegen-units=1" CARGO_INCREMENTAL=0`
# Combine 3.0.0-beta.1
running 3 tests
test http_requests_large             ... bench:     389,923 ns/iter (+/- 173,676)
test http_requests_large_cheap_error ... bench:     283,130 ns/iter (+/- 20,828)
test http_requests_small             ... bench:      77,143 ns/iter (+/- 25,075)
# Combine 2.5
running 2 tests
test http_requests_large ... bench:     343,940 ns/iter (+/- 61,612)
test http_requests_small ... bench:      68,798 ns/iter (+/- 28,187)
# Combine 2.5 (using rustc-1.17.0)
running 2 tests
test http_requests_large ... bench:     235,066 ns/iter (+/- 89,012)
test http_requests_small ... bench:      46,440 ns/iter (+/- 3,898)
# Nom https://github.com/Geal/nom_benchmarks/tree/master/http/nom-http
running 2 tests
test bigger_test ... bench:     252,113 ns/iter (+/- 90,060)
test small_test  ... bench:      48,863 ns/iter (+/- 15,727)
(*) The _cheap_error test encodes errors as only ‘end of input’ and ‘unexpected input’ instead of the verbose and heap allocated errors that combine normally provides.
Addendum
I hope that this post has sparked your interest in using Combine or at least provided an interesting overview of the additional challenges of asynchronous parsing. I am hopeful that the small performance gap between Combine 3.0 and 2.5 can be removed with some additional profiling and that the regression in rustc can be fixed. Even if that is not the case or it takes some time, Combine 3 still provides respectable performance using the same trait based approach to parser composition as before.
Version 3.0.0-beta.1 was released a little less than a week ago with a plan to make very few breaking changes before a true 3.0 in a month or two. If any breaking changes happen between now and then they will either be trivial to fix or only break things if you dive deep into the internals of combine (working with Combine’s internals is very rarely necessary).
If you run into any trouble using the library feel free to open an issue or shoot a comment on Gitter.
EDIT: For more advanced usage of partial parsing I have made a pair of PRs for redis-rs which uses combine to parse asynchronous responses from redis. The first PR does a more straight translation of the original synchronous implementation and gets a slightly awkward API as a result of this whereas the second fully embraces tokio decoders to get a simpler API.
Thanks to @ordian, @cramertj and @dsprenkels for submitting fixes!
Special thanks to @Jim-Holmstroem for proofreading this post!