Skip to content

Commit

Permalink
Kill worker on PeerClosed
Browse files Browse the repository at this point in the history
  • Loading branch information
bgamari committed Aug 3, 2023
1 parent bf60ed2 commit 71bf33b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
21 changes: 21 additions & 0 deletions warp/Network/Wai/Handler/Warp/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
module Network.Wai.Handler.Warp.Run where

import Control.Arrow (first)
import Control.Concurrent
import qualified Control.Exception
import Control.Exception (allowInterrupt)
import qualified Data.ByteString as S
Expand All @@ -21,6 +22,12 @@ import Network.Socket (gracefulClose)
#endif
import Network.Socket.BufferPool
import qualified Network.Socket.ByteString as Sock
#if MIN_VERSION_base(4,18,0)
-- For evtPeerClosed
import Network.Socket (withFdSocket)
import GHC.Event
import System.Posix.Types (Fd(Fd))
#endif
import Network.Wai
import System.Environment (lookupEnv)
import System.IO.Error (ioeGetErrorType)
Expand Down Expand Up @@ -59,6 +66,14 @@ socketConnection _ s = do
bufferPool <- newBufferPool 2048 16384
writeBuffer <- createWriteBuffer 16384
writeBufferRef <- newIORef writeBuffer
#if MIN_VERSION_base(4,18,0)
let registerPeerClosedCb = Just $ \cb -> withFdSocket s $ \fd -> do
Just mgr <- getSystemEventManager
_ <- registerFd mgr (\ _ _ -> cb) (Fd fd) evtPeerClosed OneShot
return ()
#else
let registerPeerClosedCb = Nothing
#endif
isH2 <- newIORef False -- HTTP/1.x
return Connection {
connSendMany = Sock.sendMany s
Expand All @@ -80,6 +95,7 @@ socketConnection _ s = do
, connRecvBuf = \_ _ -> return True -- obsoleted
, connWriteBuffer = writeBufferRef
, connHTTP2 = isH2
, connRegisterPeerClosedCb = registerPeerClosedCb
}
where
receive' sock pool = UnliftIO.handleIO handler $ receive sock pool
Expand Down Expand Up @@ -322,6 +338,11 @@ fork set mkConn addr app counter ii = settingsFork set $ \unmask ->
-- We need to register a timeout handler for this thread, and
-- cancel that handler as soon as we exit.
serve unmask (conn, transport) = UnliftIO.bracket register cancel $ \th -> do
case connRegisterPeerClosedCb conn of
Just registerCb -> do tid <- myThreadId
registerCb (throwTo tid PeerClosedException)
Nothing -> return ()

-- We now have fully registered a connection close handler in
-- the case of all exceptions, so it is safe to once again
-- allow async exceptions.
Expand Down
10 changes: 10 additions & 0 deletions warp/Network/Wai/Handler/Warp/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ instance UnliftIO.Exception ExceptionInsideResponseBody

----------------------------------------------------------------

-- | Exception thrown when the iniating client of a connection being handled by
-- a worker closes its end of the connection.
data PeerClosedException = PeerClosedException
deriving (Show)

instance UnliftIO.Exception PeerClosedException

----------------------------------------------------------------

-- | Data type to abstract file identifiers.
-- On Unix, a file descriptor would be specified to make use of
-- the file descriptor cache.
Expand Down Expand Up @@ -125,6 +134,7 @@ data Connection = Connection {
, connWriteBuffer :: IORef WriteBuffer
-- | Is this connection HTTP/2?
, connHTTP2 :: IORef Bool
, connRegisterPeerClosedCb :: Maybe (IO () -> IO ())
}

getConnHTTP2 :: Connection -> IO Bool
Expand Down

0 comments on commit 71bf33b

Please sign in to comment.