-
Notifications
You must be signed in to change notification settings - Fork 1
/
wc.hs
171 lines (111 loc) · 4.33 KB
/
wc.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
{-# LANGUAGE TupleSections #-}
import Control.Parallel(pseq)
import Control.Parallel.Strategies
import Data.Char(isAlpha, toLower)
import Data.Map(Map, keys, fromListWith, toList, unionWith, insert, empty)
import qualified Data.ByteString.Lazy.Char8 as B
import Data.List(sortBy)
import Data.Function(on)
import System.Environment(getArgs, getProgName)
import System.Exit(die)
import Stream
import Control.Monad.Par
{-
Name: Ecenaz Ozmen and Yefri Gaitan
Uni: eo2419 and yg2548
------------------------------
COMS 4995 003 Parallel Functional Programming
Final Project
[Description]
to compile:
stack ghc -- -O2 -Wall -threaded -rtsopts -eventlog wc
to run:
./wc big.txt seq +RTS -N8 -ls
-----
Use lts-14.5 as the "resolver" for the Haskell Tool Stack.
Your code should load under GHCi 8.6.5 with no warnings under -Wall, e.g.
:set -Wall
:l hw4
-}
-- have main function take two parameters: name of file and whether seq or par
main :: IO()
main = do
args <- getArgs
case args of
[filename, "par"] -> do
content <- B.readFile filename
print $ length $ pipeline 10000 content
-- print $ length $ withStrategy (parBuffer 100 rdeepseq) (map wcmap (chunk 10000 (map removeNonLetters $ B.words content)))
-- print $ take 10 $ sort $ wcpar content
[filename, "seq"] -> do
content <- B.readFile filename
print "hi"
-- print $ take 10 $ sort $ wcseq content
_ -> do
pn <- getProgName
die $ "Usage: " ++ pn ++ " <filename> <par/seq>"
wcmap :: Stream B.ByteString -> Par (Stream (B.ByteString, Int))
wcmap = streamMap (\bs -> (bs, 1))
-- wcreduce :: Stream (Stream (B.ByteString, Int)) -> Par (Stream (Map B.ByteString Int))
-- wcreduce = streamMap ((runPar . streamFold (insertTuple) empty))
wcreduce :: Stream (B.ByteString, Int) -> Par (Map B.ByteString Int)
wcreduce = streamFold (insertTuple) empty
finalreduce :: Stream (Map B.ByteString Int) -> Par (Map B.ByteString Int)
finalreduce = streamFold (unionWith (+)) empty
insertTuple :: Map B.ByteString Int -> (B.ByteString, Int) -> Map B.ByteString Int
insertTuple m (k,v) = insert k v m
chunk :: Int -> [a] -> [[a]]
chunk _ [] = []
chunk n xs = let (as,bs) = splitAt n xs in as : chunk n bs
removeNonLetters :: B.ByteString -> B.ByteString
removeNonLetters = B.filter isAlpha . B.map toLower
pipeline :: Int -> B.ByteString -> [(B.ByteString, Int)]
pipeline n bs = runPar $ do
s0 <- streamFromList (chunk n (map removeNonLetters (B.words bs))) -- stream of lists
s1 <- streamMap (runPar . streamFromList) s0 -- make stream of streams
s2 <- streamMap (runPar . wcmap) s1 -- gives stream of streams for reduce
s3 <- streamMap (runPar . wcreduce) s2 -- stream of maps
s4 <- finalreduce s3
return $ toList s4
{-
wcseq :: B.ByteString -> [(B.ByteString, Int)]
wcseq = seqMapReduce wcmap wcreduce . split 100
wcpar :: B.ByteString -> [(B.ByteString, Int)]
wcpar = finalreduce . parMapReduce rseq wcmap rseq parwcreduce . split 100
-- wc helper functions
--
wcmap :: [B.ByteString] -> [(B.ByteString, Int)]
wcmap = map (, 1)
parwcreduce :: [(B.ByteString, Int)] -> Map B.ByteString Int
parwcreduce = fromListWith (+)
finalreduce :: [Map B.ByteString Int] -> [(B.ByteString, Int)]
finalreduce = toList . unionsWith (+)
wcreduce :: [[(B.ByteString, Int)]] -> [(B.ByteString, Int)]
wcreduce = toList . fromListWith (+) . concat
-- map reduce library
--
seqMapReduce :: (a -> b) -> ([b] -> c) -> [a] -> c
seqMapReduce mf rf = rf . map mf
parMapReduce
:: Strategy b -- for mapping
-> (a -> b) -- map func
-> Strategy c -- for reducing
-> (b -> c) -- reduce func
-> [a] -- init list
-> [c]
parMapReduce mstrat mf rstrat rf xs =
mres `pseq` rres
where mres = map mf xs `using` parBuffer 100 mstrat
rres = map rf mres -- `using` parBuffer 100 rstrat -- [[(B.ByteString, Int)]]
-- Helper functions
--
sort :: Ord b => [(a,b)] -> [(a,b)]
sort = sortBy (flip compare `on` snd)
split :: Int -> B.ByteString -> [[B.ByteString]]
split n bs = chunk n $ map removeNonLetters $ B.words bs
chunk :: Int -> [a] -> [[a]]
chunk _ [] = []
chunk n xs = let (as,bs) = splitAt n xs in as : chunk n bs
removeNonLetters :: B.ByteString -> B.ByteString
removeNonLetters = B.filter isAlpha . B.map toLower
-}