Created
March 31, 2019 09:19
-
-
Save robinp/fa14b51425f010454a5ea5b1c9f220ce to your computer and use it in GitHub Desktop.
LevelDB Stream merging and grouping
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{- Note: originally for treetide.com -} | |
{- Releasing under Apache 2.0 licence (https://www.apache.org/licenses/LICENSE-2.0), feel free to reuse accordingly. -} | |
{-# LANGUAGE TupleSections #-} | |
module Database.LevelDB.Streaming.Extended | |
( module X | |
, mergeStreams | |
, groupStream | |
) | |
where | |
import Control.Monad.IO.Class ( MonadIO ) | |
import Database.LevelDB.Streaming as X | |
-- | Merge two streams according to the comparison function over the entries. | |
mergeStreams | |
:: (MonadIO m) | |
=> (a -> a -> Ordering) | |
-> Stream m a | |
-> Stream m a | |
-> Stream m a | |
mergeStreams cmp (Stream nextA sa0) (Stream nextB sb0) = Stream | |
next | |
((,,) Nothing <$> sa0 <*> sb0) | |
where | |
-- Implementation note: by adding more info to the internal step type, | |
-- we might some repeated computations (see LevelS source for inspiration). | |
-- Keeping it dumb for now. | |
-- | |
-- Also: awfully copy-pasted code. | |
next (w, sa, sb) = case w of | |
Nothing -> do | |
stepA <- nextA sa | |
case stepA of | |
Done -> do | |
stepB <- nextB sb | |
return $! case stepB of | |
Done -> Done | |
Skip sb' -> Skip (w, sa, sb') | |
Yield b sb' -> Yield b (w, sa, sb') | |
Skip sa' -> return $! Skip (w, sa', sb) | |
Yield a sa' -> do | |
stepB <- nextB sb | |
return $! case stepB of | |
Done -> Yield a (w, sa', sb) | |
Skip sb' -> Skip (Just (Left a), sa', sb') | |
Yield b sb' -> case cmp a b of | |
LT -> Yield a (Just (Right b), sa', sb') | |
EQ -> Yield a (Just (Right b), sa', sb') | |
GT -> Yield b (Just (Left a), sa', sb') | |
Just (Left a) -> do | |
stepB <- nextB sb | |
return $! case stepB of | |
Done -> Yield a (Nothing, sa, sb) | |
Skip sb' -> Skip (w, sa, sb') | |
Yield b sb' -> case cmp a b of | |
LT -> Yield a (Just (Right b), sa, sb') | |
EQ -> Yield a (Just (Right b), sa, sb') | |
GT -> Yield b (w, sa, sb') | |
Just (Right b) -> do | |
stepA <- nextA sa | |
return $! case stepA of | |
Done -> Yield b (Nothing, sa, sb) | |
Skip sa' -> Skip (w, sa', sb) | |
Yield a sa' -> case cmp a b of | |
LT -> Yield a (w, sa', sb) | |
EQ -> Yield a (w, sa', sb) | |
GT -> Yield b (Just (Left a), sa', sb) | |
-- | Similar to 'groupBy' but for 'Stream'. Buffers consecutive entries while | |
-- the equality function returns true, and yields them together. | |
groupStream :: (MonadIO m) => (a -> a -> Bool) -> Stream m a -> Stream m [a] | |
groupStream f (Stream next0 s) = Stream next (([], ) <$> s) | |
where | |
next (accum, s0) = do | |
step <- next0 s0 | |
return $! case step of | |
Done -> case accum of | |
[] -> Done | |
-- It seems 's0' can be re-queried without horrible segfaults, | |
-- so not introducing 'Maybe' to signal end-of-upstream. | |
_ -> Yield accum ([], s0) | |
Skip s0' -> Skip (accum, s0') | |
Yield a s0' -> case accum of | |
[] -> Skip ([a], s0') | |
(x : _) -> if f x a | |
then Skip (a : accum, s0') | |
else Yield accum ([a], s0') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment