@@ -17,7 +17,6 @@ import Control.Concurrent.Extra
1717import Control.Concurrent.STM.Stats (STM , atomically ,
1818 atomicallyNamed ,
1919 modifyTVar' , newTVarIO ,
20- putTMVar , readTMVar ,
2120 readTVarIO )
2221import Control.Exception
2322import Control.Monad
@@ -26,6 +25,7 @@ import Control.Monad.Trans.Class (lift)
2625import Control.Monad.Trans.Reader
2726import qualified Control.Monad.Trans.State.Strict as State
2827import Data.Dynamic
28+ import Data.Either
2929import Data.Foldable (for_ , traverse_ )
3030import Data.IORef.Extra
3131import Data.Maybe
@@ -39,10 +39,8 @@ import Development.IDE.Graph.Internal.Types
3939import qualified Focus
4040import qualified ListT
4141import qualified StmContainers.Map as SMap
42+ import System.IO.Unsafe
4243import System.Time.Extra (duration , sleep )
43- import UnliftIO (MonadUnliftIO (withRunInIO ),
44- newEmptyTMVarIO )
45- import qualified UnliftIO.Exception as UE
4644
4745#if MIN_VERSION_base(4,19,0)
4846import Data.Functor (unzip )
@@ -80,7 +78,7 @@ incDatabase db Nothing = do
8078updateDirty :: Monad m => Focus. Focus KeyDetails m ()
8179updateDirty = Focus. adjust $ \ (KeyDetails status rdeps) ->
8280 let status'
83- | Running _ _ x <- status = Dirty x
81+ | Running _ _ _ x <- status = Dirty x
8482 | Clean x <- status = Dirty (Just x)
8583 | otherwise = status
8684 in KeyDetails status' rdeps
@@ -90,60 +88,58 @@ build
9088 => Database -> Stack -> f key -> IO (f Key , f value )
9189-- build _ st k | traceShow ("build", st, k) False = undefined
9290build db stack keys = do
93- ! built <- runAIO $ builder db stack (fmap newKey keys)
91+ built <- runAIO $ do
92+ built <- builder db stack (fmap newKey keys)
93+ case built of
94+ Left clean -> return clean
95+ Right dirty -> liftIO dirty
9496 let (ids, vs) = unzip built
9597 pure (ids, fmap (asV . resultValue) vs)
9698 where
9799 asV :: Value -> value
98100 asV (Value x) = unwrapDynamic x
99101
100- data BuildArity = BuildUnary | BuildNary
101102-- | Build a list of keys and return their results.
102103-- If none of the keys are dirty, we can return the results immediately.
103104-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
104- builder :: (Traversable f ) => Database -> Stack -> f Key -> AIO (f (Key , Result ))
105+ builder
106+ :: Traversable f => Database -> Stack -> f Key -> AIO (Either (f (Key , Result )) (IO (f (Key , Result ))))
105107-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
106- builder db stack keys = do
107- let ba = if length keys == 1 then BuildUnary else BuildNary
108- keyWaits <- for keys $ \ k -> builderOne ba db stack k
109- ! res <- for keyWaits $ \ (k, waitR) -> do
110- ! v<- liftIO waitR
111- return (k, v)
112- return res
113-
114- builderOne :: BuildArity -> Database -> Stack -> Key -> AIO (Key , IO Result )
115- builderOne ba db@ Database {.. } stack id = UE. mask $ \ restore -> do
116- current <- liftIO $ readTVarIO databaseStep
117- barrier <- newEmptyTMVarIO
118- (k, registerWaitResult) <- liftIO $ atomicallyNamed " builder" $ do
119- -- Spawn the id if needed
120- status <- SMap. lookup id databaseValues
121- val <-
122- let refreshRsult s = do
123- let putResult act = do
124- res <- act
125- liftIO $ atomically $ putTMVar barrier res
126- return res
127- let act = restore $ (case ba of
128- BuildNary ->
129- asyncWithCleanUp $
130- putResult $ refresh db stack id s
131- BuildUnary -> fmap return $ putResult $ refresh db stack id s)
132- `UE.onException` (UE. uninterruptibleMask_ $ liftIO (atomicallyNamed " builder - onException" (SMap. focus updateDirty id databaseValues)))
133- -- Mark the key as running
134- SMap. focus (updateStatus $ Running current (atomically $ readTMVar barrier) s) id databaseValues
135- return act
136- in case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
137- Dirty mbr -> refreshRsult mbr
138- Running step ba _mbr
139- | step /= current -> error $ " Inconsistent database state: key " ++ show id ++ " is marked Running at step " ++ show step ++ " but current step is " ++ show current
140- | memberStack id stack -> throw $ StackException stack
141- | otherwise -> pure . pure $ ba
142- Clean r -> pure . pure . pure $ r
143- -- force here might contains async exceptions from previous runs
144- pure (id , val)
145- waitR <- registerWaitResult
146- return (k, waitR)
108+ builder db@ Database {.. } stack keys = withRunInIO $ \ (RunInIO run) -> do
109+ -- Things that I need to force before my results are ready
110+ toForce <- liftIO $ newTVarIO []
111+ current <- liftIO $ readTVarIO databaseStep
112+ results <- liftIO $ for keys $ \ id ->
113+ -- Updating the status of all the dependencies atomically is not necessary.
114+ -- Therefore, run one transaction per dep. to avoid contention
115+ atomicallyNamed " builder" $ do
116+ -- Spawn the id if needed
117+ status <- SMap. lookup id databaseValues
118+ val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
119+ Clean r -> pure r
120+ Running _ force val _
121+ | memberStack id stack -> throw $ StackException stack
122+ | otherwise -> do
123+ modifyTVar' toForce (Wait force : )
124+ pure val
125+ Dirty s -> do
126+ let act = run (refresh db stack id s)
127+ (force, val) = splitIO act
128+ SMap. focus (updateStatus $ Running current force val s) id databaseValues
129+ modifyTVar' toForce (Spawn force: )
130+ pure val
131+
132+ pure (id , val)
133+
134+ toForceList <- liftIO $ readTVarIO toForce
135+ let waitAll = run $ waitConcurrently_ toForceList
136+ case toForceList of
137+ [] -> return $ Left results
138+ _ -> return $ Right $ do
139+ waitAll
140+ pure results
141+
142+
147143-- | isDirty
148144-- only dirty when it's build time is older than the changed time of one of its dependencies
149145isDirty :: Foldable t => Result -> t (a , Result ) -> Bool
@@ -159,27 +155,31 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
159155refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> AIO Result
160156refreshDeps visited db stack key result = \ case
161157 -- no more deps to refresh
162- [] -> compute' db stack key RunDependenciesSame (Just result)
158+ [] -> liftIO $ compute db stack key RunDependenciesSame (Just result)
163159 (dep: deps) -> do
164160 let newVisited = dep <> visited
165161 res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
166- if isDirty result res
162+ case res of
163+ Left res -> if isDirty result res
167164 -- restart the computation if any of the deps are dirty
168- then compute' db stack key RunDependenciesChanged (Just result)
165+ then liftIO $ compute db stack key RunDependenciesChanged (Just result)
169166 -- else kick the rest of the deps
170167 else refreshDeps newVisited db stack key result deps
168+ Right iores -> do
169+ res <- liftIO iores
170+ if isDirty result res
171+ then liftIO $ compute db stack key RunDependenciesChanged (Just result)
172+ else refreshDeps newVisited db stack key result deps
171173
172-
173- -- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
174+ -- | Refresh a key:
174175-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
175176refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
176177refresh db stack key result = case (addStack key stack, result) of
177178 (Left e, _) -> throw e
178179 (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
179- (Right stack, _) -> compute' db stack key RunDependenciesChanged result
180+ (Right stack, _) ->
181+ liftIO $ compute db stack key RunDependenciesChanged result
180182
181- compute' :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
182- compute' db stack key mode result = liftIO $ compute db stack key mode result
183183-- | Compute a key.
184184compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
185185-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
@@ -247,6 +247,18 @@ getKeysAndVisitAge db = do
247247 getAge Result {resultVisited = Step s} = curr - s
248248 return keysWithVisitAge
249249--------------------------------------------------------------------------------
250+ -- Lazy IO trick
251+
252+ data Box a = Box { fromBox :: a }
253+
254+ -- | Split an IO computation into an unsafe lazy value and a forcing computation
255+ splitIO :: IO a -> (IO () , a )
256+ splitIO act = do
257+ let act2 = Box <$> act
258+ let res = unsafePerformIO act2
259+ (void $ evaluate res, fromBox res)
260+
261+ --------------------------------------------------------------------------------
250262-- Reverse dependencies
251263
252264-- | Update the reverse dependencies of an Id
@@ -295,12 +307,8 @@ newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async ()]) IO a }
295307-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
296308runAIO :: AIO a -> IO a
297309runAIO (AIO act) = do
298- asyncsRef <- newIORef []
299- -- Log the exact exception (including async exceptions) before cleanup,
300- -- then rethrow to preserve previous semantics.
301- runReaderT act asyncsRef `onException` do
302- asyncs <- atomicModifyIORef' asyncsRef ([] ,)
303- cleanupAsync asyncs
310+ asyncs <- newIORef []
311+ runReaderT act asyncs `onException` cleanupAsync asyncs
304312
305313-- | Like 'async' but with built-in cancellation.
306314-- Returns an IO action to wait on the result.
@@ -311,22 +319,25 @@ asyncWithCleanUp act = do
311319 -- mask to make sure we keep track of the spawned async
312320 liftIO $ uninterruptibleMask $ \ restore -> do
313321 a <- async $ restore io
314- atomicModifyIORef'_ st (void a: )
322+ atomicModifyIORef'_ st (void a : )
315323 return $ wait a
316324
317325unliftAIO :: AIO a -> AIO (IO a )
318326unliftAIO act = do
319327 st <- AIO ask
320328 return $ runReaderT (unAIO act) st
321329
322- instance MonadUnliftIO AIO where
323- withRunInIO k = do
324- st <- AIO ask
325- liftIO $ k (\ aio -> runReaderT (unAIO aio) st)
330+ newtype RunInIO = RunInIO (forall a . AIO a -> IO a )
331+
332+ withRunInIO :: (RunInIO -> AIO b ) -> AIO b
333+ withRunInIO k = do
334+ st <- AIO ask
335+ k $ RunInIO (\ aio -> runReaderT (unAIO aio) st)
326336
327- cleanupAsync :: [Async a ] -> IO ()
337+ cleanupAsync :: IORef [Async a ] -> IO ()
328338-- mask to make sure we interrupt all the asyncs
329- cleanupAsync asyncs = uninterruptibleMask $ \ unmask -> do
339+ cleanupAsync ref = uninterruptibleMask $ \ unmask -> do
340+ asyncs <- atomicModifyIORef' ref ([] ,)
330341 -- interrupt all the asyncs without waiting
331342 mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
332343 -- Wait until all the asyncs are done
@@ -337,3 +348,32 @@ cleanupAsync asyncs = uninterruptibleMask $ \unmask -> do
337348 traceM " cleanupAsync: waiting for asyncs to finish"
338349 withAsync warnIfTakingTooLong $ \ _ ->
339350 mapM_ waitCatch asyncs
351+
352+ data Wait
353+ = Wait { justWait :: ! (IO () )}
354+ | Spawn { justWait :: ! (IO () )}
355+
356+ fmapWait :: (IO () -> IO () ) -> Wait -> Wait
357+ fmapWait f (Wait io) = Wait (f io)
358+ fmapWait f (Spawn io) = Spawn (f io)
359+
360+ waitOrSpawn :: Wait -> IO (Either (IO () ) (Async () ))
361+ waitOrSpawn (Wait io) = pure $ Left io
362+ waitOrSpawn (Spawn io) = Right <$> async io
363+
364+ waitConcurrently_ :: [Wait ] -> AIO ()
365+ waitConcurrently_ [] = pure ()
366+ waitConcurrently_ [one] = liftIO $ justWait one
367+ waitConcurrently_ many = do
368+ ref <- AIO ask
369+ -- spawn the async computations.
370+ -- mask to make sure we keep track of all the asyncs.
371+ (asyncs, syncs) <- liftIO $ uninterruptibleMask $ \ unmask -> do
372+ waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many
373+ let (syncs, asyncs) = partitionEithers waits
374+ liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
375+ return (asyncs, syncs)
376+ -- work on the sync computations
377+ liftIO $ sequence_ syncs
378+ -- wait for the async computations before returning
379+ liftIO $ traverse_ wait asyncs
0 commit comments