Skip to content

Latest commit

 

History

History
384 lines (310 loc) · 12.5 KB

Upgrading-0.8-to-0.9.md

File metadata and controls

384 lines (310 loc) · 12.5 KB

Upgrading to streamly 0.9.0

Also see the detailed changelog describing all the changes in the release:

Streamly.Prelude module has been deprecated, equivalent functionality is covered by the Streamly.Data.Stream, Streamly.Data.Stream.Prelude, and Streamly.Data.Fold modules. The new modules use a monomorphic Stream type instead of the polymorphic IsStream t type.

Streamly.Data.Stream module and the Stream type are designed for writing high-performance fused pipelines not involving explicit recursion. For writing code that may require recursive function calls, Streamly.Data.Stream.StreamK module and the StreamK type have been introduced which provide a CPS based stream implementation. Stream and StreamK types can be easily interconverted. These changes have been made to make performance robust and not rely on GHC rewrite rules which could be fragile. For example, GHC 9.0.x had broken the rewrite rule which was not fixed until GHC-9.2.2. This split also gives more power and transparency of the performance behavior to the programmer.

Instead of using separate stream types for concurrent code, now you have to use explicit concurrent combinators with appropriate concurrency parameters. These combinators are available in the Streamly.Data.Stream.Prelude module. This change has been made to allow programmers to control concurrent behavior in a more robust way and reduce pitfalls.

The old code can be adapted to use the new modules with some changes. More details about this is supplied in the following sections. Please open an issue if you cannot find a way to adapt the code to the new release.

Assume the following imports in the code snippets below:

>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.StreamK as StreamK
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Parser as Parser

Dependencies in cabal file

If you allow streamly < 0.9 in version bounds in your package then you may not be able to build if any of the following is true:

  • there is a dependency on streamly-core (module name conflicts)
  • there is a dependency on streamly-bytestring >= 0.2 (type conflict)

You can define and use a build flag in your package to allow older streamly versions:

flag old-streamly
  description: Allow streamly versions lower than 0.9
  manual: True
  default: False
  if flag(old-streamly)
    build-depends:
      streamly >=0.8.3 && < 0.9
  else
    build-depends:
      streamly-core >= 0.1.0 && < 0.2,
      streamly >=0.9.0 && < 0.10

The Stream and StreamK types

The following types are removed:

  • IsStream
  • ZipSerialM
  • ZipAsyncM
  • WSerialT
  • WAsyncT
  • SerialT
  • ParallelT
  • AsyncT
  • AheadT

In the new release, the Stream type is the primary stream type that you will use most of the time. You can think of it as a replacement for the SerialT type. However, it does not provide an Applicative or Monad instance.

The CrossStream type in Streamly.Internal.Data.Stream is a wrapper over Stream type supplying the Monad instance. However, see the "Stream Fusion" section in the Streamly.Data.Stream module for limitations of the Stream type. The StreamK type and CrossStreamK (in Streamly.Internal.Data.Stream.StreamK) could be used to overcome the limitations of Stream type.

If required, you can use the template-haskell functions in Streamly.Data.Stream.MkType to create stream type wrappers (like ZipSerialM or WSerialT) with custom Applicative or Monadic properties. But in general, try to avoid specific types and use explicit functions from the stream module.

Here are the TH macro based recipes to define types equivalent to all the older types:

{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}

import Streamly.Data.Stream.MkType
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Internal.Data.StreamK as StreamK
import Language.Haskell.TH

SerialT: Use CrossStreamK from Streamly.Internal.Data.StreamK equivalent Monad and other instances.

For WSerialT, create a newtype wrapper using the following monad bind operation:

bind = StreamK.bindWith StreamK.interleave

WSerialT requires StreamK, therefore you can not generate the code directly using TH macros. However, you can print the code generated by the TH macro. Then you can tweak that code, replacing Stream type with StreamK.

expr <- runQ (mkCrossType "WSerialT" "bind" True)
putStrLn $ pprint expr

For AsyncT:

bind = flip (Stream.parConcatMap id)
$(mkCrossType "AsyncT" "bind" True)

For WAsyncT:

bind = flip (Stream.parConcatMap (Stream.interleaved True))
$(mkCrossType "WAsyncT" "bind" True)

For AheadT:

bind = flip (Stream.parConcatMap (Stream.ordered True))
$(mkCrossType "AheadT" "bind" True)

For ParallelT:

bind = flip (Stream.parConcatMap (Stream.eager True))
$(mkCrossType "ParallelT" "bind" True)

For ZipSerialM:

apply = Stream.zipWith ($)
$(mkZipType "ZipSerialM" "apply" False)

For ZipAsync:

apply = Stream.parApply id
$(mkZipType "ZipAsync" "apply" False)

adapt is not needed anymore.

>>> (.:) = StreamK.cons
>>> cons = StreamK.cons
>>> wSerial = StreamK.interleave
>>> serial = StreamK.append
>>> fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
>>> fromIndices f = fmap f $ Stream.enumerateFrom 0
>>> fromListM = Stream.sequence . Stream.fromList
>>> fromFoldable = StreamK.toStream . StreamK.fromFoldable
>>> fromFoldableM = Stream.sequence . fromFoldable

Stream folding functions

Explicit stream fold functions have been omitted from the new stream module. You can use the following equivalent definitions:

>>> the = Stream.fold Fold.the
>>> sum = Stream.fold Fold.sum
>>> product = Stream.fold Fold.product
>>> or = Stream.fold Fold.or
>>> null = Stream.fold Fold.null
>>> elemIndex a = Stream.fold (Fold.elemIndex a)
>>> elem a = Stream.fold (Fold.elem a)
>>> notElem a = Stream.fold (Fold.notElem a)
>>> minimumBy ordering = Stream.fold (Fold.minimumBy ordering)
>>> minimum = Stream.fold Fold.minimum
>>> maximumBy ordering = Stream.fold (Fold.maximumBy ordering)
>>> maximum = Stream.fold Fold.maximum
>>> mapM_ f = Stream.fold (Fold.drainMapM f)
>>> lookup a = Stream.fold (Fold.lookup a)
>>> length = Stream.fold Fold.length
>>> last = Stream.fold Fold.latest
>>> head = Stream.fold Fold.one
>>> foldlM' f a = Stream.fold (Fold.foldlM' f a)
>>> foldl1 f = Stream.fold (Fold.foldl1' f)
>>> foldl' f a = Stream.fold (Fold.foldl' f a)
>>> findIndex eq = Stream.fold (Fold.findIndex eq)
>>> find eq = Stream.fold (Fold.find eq)
>>> findM eq = Stream.fold (Fold.findM eq)
>>> drainWhile p = Stream.fold Fold.drain . Stream.takeWhile p
>>> drainN i = Stream.fold Fold.drain . Stream.take i
>>> drain = Stream.fold Fold.drain
>>> any p = Stream.fold (Fold.any p)
>>> and = Stream.fold Fold.and
>>> all p = Stream.fold (Fold.all p)
>>> (!!) i = Stream.fold (Fold.index i)
>>> tail = Streamly.Internal.Data.StreamK.tail
>>> init = Streamly.Internal.Data.StreamK.init

Mapping functions:

>>> map = fmap

Similarly for scanning use Stream.scan or Stream.postscan with an appropriate fold.

>>> scanl' f z = Stream.scan (Fold.foldl' f z)
>>> scanlM' f z = Stream.scan (Fold.foldlM' f z)
>>> postscanl' f z = Stream.postscan (Fold.foldl' f z)
>>> postscanlM' f z = Stream.postscan (Fold.foldlM' f z)
>>> scanl1' f = Stream.catMaybes . Stream.scan (Fold.foldl1' f)
>>> scanl1M' f = Stream.catMaybes . Stream.scan (Fold.foldlM1' f)
>>> concatMapWith = StreamK.concatMapWith
>>> concatFoldableWith f = Prelude.foldr f StreamK.nil
>>> concatMapFoldableWith f g = Prelude.foldr (f . g) StreamK.nil
>>> concatForFoldableWith f xs g = Prelude.foldr (f . g) StreamK.nil xs

Filters:

>>> deleteBy cmp x = Stream.scanMaybe (Fold.deleteBy cmp x)
>>> findIndices p = Stream.scanMaybe (Fold.findIndices p)
>>> elemIndices a = findIndices (== a)

Custom implementations of most of these folds, scans and filters are also available in the Streamly.Internal.Data.Stream module.

Stream splitting and grouping functions

Stream splitting and grouping functions like splitOn, wordsBy, and groupsBy have been omitted from the new stream module as these can now be implemented using foldMany and an appropriate fold from the Streamly.Data.Fold module, or using parseMany and an appropriate parser from the Streamly.Data.Parser module.

>>> uniq = Stream.scanMaybe (Fold.uniqBy (==))
>>> splitWithSuffix p f = Stream.foldMany (Fold.takeEndBy p f)
>>> splitOn = Streamly.Internal.Data.Stream.splitOn
>>> splitOnSuffix p f = Stream.foldMany (Fold.takeEndBy_ p f)
>>> indexedR = Streamly.Internal.Data.Stream.indexedR
>>> groupsBy eq fld = Stream.parseMany (Parser.groupBy eq fld)
>>> groups = groupsBy (==)
>>> groupsByRolling = Streamly.Internal.Data.Stream.groupsRollingBy
>>> wordsBy p f = Stream.parseMany (Parser.wordBy p f)
>>> chunksOf n f = Stream.foldMany (Fold.take n f)

Direct implementation of these are also available in Streamly.Internal.Data.Stream.

Concurrency

Earlier, concurrent and non-concurrent code used the same combinators. The code was made concurrent by using different concurrent stream types such as AsyncT, ParallelT etc. Now you use the same stream type everywhere, you have to choose a concurrent combinator for concurrent behavior. For example, use mapM for serial behavior and parMapM for concurrent behavior. Concurrent combinators can be imported from Streamly.Data.Stream.Prelude module. Concurrent combinators are prefixed with par.

Parallel combinators take a concurrency config argument to specify the concurrency control parameters. The following combinators have the same meaning as before except that they are used to set the config parameters instead of being applied on the stream:

  • rate
  • maxRate
  • constRate
  • avgRate
  • minRate
  • maxThreads
  • maxBuffer

A stream is evaluated asynchronously using parEval:

>>> :set -XFlexibleContexts
>>> mkAsync = Stream.parEval id

Earlier consM was used to create an implicitly concurrent stream of actions. In the new release, an equivalent effect is achieved by using a serial consM to create a stream of actions and then explicitly using Stream.parEval on it to evaluate it concurrently.

>>> consM = StreamK.consM
>>> (|:) = consM

Note that you will have to use StreamK.toStream to covert it to Stream before using parEval on it.

Existing generation combinators that can be implemented using new primitives:

>>> repeatM = Stream.parRepeatM
>>> replicateM = Stream.parReplicateM
>>> unfoldrM step = Stream.parEval id . Stream.unfoldrM step
>>> iterateM step = Stream.parEval id . Stream.iterateM step
>>> fromIndicesM f = Stream.parEval id . fromIndicesM f
>>> fromListM = Stream.parSequence id . Stream.fromList
>>> fromFoldableM = Stream.parSequence id . StreamK.toStream . StreamK.fromFoldable

Existing transformation combinators that can be implemented using parEval:

>>> (|$.) f = f . Stream.parEval id
>>> (|&.) = flip (|$.)
>>> (|$) f = f . Stream.parEval id
>>> (|&) = flip (|$)
>>> sequence = Stream.parSequence
>>> mapM = Stream.parMapM

parList is used to evaluate multiple streams concurrently and combine the outputs. Existing combinators that can be implemented using parList:

>>> async x y = Stream.parList id [x, y]
>>> wAsync x y = Stream.parList (Stream.interleaved True) [x, y]
>>> parallel x y = Stream.parList (Stream.eager True) [x, y]
>>> ahead x y = Stream.parList (Stream.ordered True) [x, y]

Concurrent zipping and merging combinators:

>>> zipAsyncWithM = Stream.parZipWith id
>>> zipAsyncWith = Stream.parZipWith id
>>> mergeAsyncByM = Stream.parMergeByM id
>>> mergeAsyncBy = Stream.parMergeBy id

The equivalent of concatMapWith using a concurrent combining operation in the new release is parConcatMap. The config argument in parConcatMap can specify an equivalent of the combining operation. Similarly, concurrent concatFoldableWith, concatMapFoldableWith, concatForFoldableWith can also be expressed using parConcatMap.