1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
------------------------------------------------------------------------------ --- Library for distributed programming with ports. --- [This paper](https://www.informatik.uni-kiel.de/~mh/papers/PPDP99.html) --- contains a description of the basic ideas behind this library. --- --- @author Michael Hanus --- @version March 2021 ------------------------------------------------------------------------------ module Network.Ports ( Port, openPort, send, doSend, openNamedPort , connectPort, connectPortRepeat, connectPortWait , ping, timeoutOnStream , openProcessPort, SP_Msg(..), choiceSPEP , newObject, newNamedObject, runNamedServer , hWaitForInputOrMsg, hWaitForInputsOrMsg ) where import System.Process ( sleep ) import Network.CPNS ( cpnsAlive, getPortInfo, registerPort ) --- The internal constructor for the port datatype is not visible to the user. data Port a = InternalPort String Int Int a --- Opens an internal port for communication. --- @param p - a free variable which will be constrained --- with the port messages --- @param s - a free variable which will be instantiated --- to the stream of incoming messages openPort :: Port a -> [a] -> Bool openPort p ms = (prim_openPort $! p) $!! ms prim_openPort :: Port a -> [a] -> Bool prim_openPort external --- Sends a message to a port. send :: a -> Port a -> Bool send msg p = (prim_send $!! ensureNotFree msg) $# p prim_send :: a -> Port a -> Bool prim_send external --- I/O action that sends a message to a port. doSend :: a -> Port a -> IO () doSend msg p = doSolve (send msg p) --- A constrained which is satisfied after some amount of time --- (currently only supported in TasteCurry). --- @param n - the satisfaction time in milliseconds after :: Int -> Bool after external --- Checks whether port p is still reachable. --- @param n - the time to wait for reachability in milliseconds --- @param p - a port to be checked for reachability --- @return Nothing if port p is unreachable within n milliseconds, --- or (Just m) if port p could be contacted within m milliseconds ping :: Int -> Port _ -> IO (Maybe Int) ping n p = (prim_ping $# n) $# p prim_ping :: Int -> Port _ -> IO (Maybe Int) prim_ping external --- Checks for instantiation of a stream within some amount of time. --- @param n - the time to wait for instantiation in milliseconds --- @param str - the stream to be checked for instantiation --- (usually the stream of incoming messages at some port) --- @return (Just str) if str is instantiated within n milliseconds, --- or Nothing otherwise timeoutOnStream :: Int -> [a] -> Maybe [a] timeoutOnStream n str = (prim_timeoutOnStream $# n) str prim_timeoutOnStream :: Int -> [a] -> Maybe [a] prim_timeoutOnStream external --- A "stream port" is an adaption of the port concept to model the --- communication with bidirectional streams, i.e., a stream port is --- a port connection to a bidirectional stream (e.g., opened by --- openProcessPort) where the communication --- is performed via the following stream port messages. --- --- @cons SP_Put s - write the argument s on the output stream --- @cons SP_GetLine s - unify the argument s with the next text line of the --- input stream --- @cons SP_GetChar c - unify the argument c with the next character of the --- input stream --- @cons SP_EOF b - unify the argument b with True if we are at the end --- of the input stream, otherwise with False --- @cons SP_Close - close the input/output streams data SP_Msg = SP_Put String -- write the argument on the output stream | SP_GetLine String -- unify the argument with the next text line of the -- input stream | SP_GetChar Char -- unify the argument with the next character of the -- input stream | SP_EOF Bool -- unify the argument with True if we are at the end -- of the input stream, otherwise with False | SP_Close -- close the input/output streams --- Opens a new connection to a process that executes a shell command. --- @param cmd - the shell command to be executed --- @return the output/input stream (represented as a stream port) --- that is connected to the standard input/output of the process --- performing the execution of cmd. openProcessPort :: String -> IO (Port SP_Msg) openProcessPort cmd = prim_openProcessPort $## cmd prim_openProcessPort :: String -> IO (Port SP_Msg) prim_openProcessPort external --- Opens an external port with a symbolic name. --- @param portname - the symbolic name under which the port is accessible --- (any string without occurrences of '@') --- @return the stream of incoming messages at this port openNamedPort :: String -> IO [_] openNamedPort name = do stream <- openPortOnSocket socketnr portnr -- open new port registerPort name socketnr portnr return stream where socketnr,portnr free --- Waits for connection to an external port. --- In contrast to <code>connectPort</code>, this action waits until --- the external port has been registered with its symbolic name. --- @param waittime - the time to wait before retrying (in milliseconds) --- @param action - I/O action to be executed before each wait cycle --- @param retries - number of retries before giving up (-1 = retry forever) --- @param portname - the symbolic name of the external port --- (must be either of the form "name@machine" or "name" --- where the latter is a shorthand for "name@localhost") --- @return Nothing (if connection is not possible within the given limits) --- or (Just p) where p is a port with the symbolic name portname connectPortRepeat :: Int -> IO _ -> Int -> String -> IO (Maybe (Port _)) connectPortRepeat waittime action retries nameAtHost = do let (name,atHost) = break (=='@') nameAtHost host = if atHost=="" then "localhost" else tail atHost -- check whether remote CPNS demon is alive: alive <- cpnsAlive host if not alive then tryAgain else do -- get remote socket/port numbers: (snr,pnr) <- getPortInfo name host if snr==0 then tryAgain else connectPortAtSocket snr pnr host >>= return . Just where tryAgain = if retries==0 then return Nothing else do action sleep (ms2s waittime) connectPortRepeat waittime action (decr retries) nameAtHost ms2s n = let mn = n `div` 1000 in if mn==0 then 1 else mn decr n = if n<0 then n else n-1 --- Waits for connection to an external port and return the connected port. --- This action waits (possibly forever) until the external port is --- registered. --- @param portname - the symbolic name of the external port --- (must be either of the form "name@host" or "name" --- where the latter is a shorthand for "name@localhost") --- @return a port with the symbolic name portname connectPortWait :: String -> IO (Port _) connectPortWait nameAtHost = do Just port <- connectPortRepeat 1000 (return ()) (-1) nameAtHost return port --- Connects to an external port. The external port must be already --- registered, otherwise an error is reported. --- @param portname - the symbolic name of the external port --- (must be either of the form "name@host" or "name" --- where the latter is a shorthand for "name@localhost") --- @return a port with the symbolic name portname connectPort :: String -> IO (Port _) connectPort nameAtHost = do let (name,atHost) = break (=='@') nameAtHost host = if atHost=="" then "localhost" else tail atHost -- get remote socket/port numbers: (snr,pnr) <- getPortInfo name host if snr==0 then error ("connectPort: Port \""++name++"@"++host++ "\" is not registered!") else return () connectPortAtSocket snr pnr host --- This function implements a committed choice over the receiving --- of messages via a stream port and an external port. --- --- <EM>Note that the implementation of choiceSPEP works only with --- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions --- of Sicstus-Prolog).</EM> --- --- @param sp - a stream port sp --- @param ms - a stream of messages received via an external port --- @return (Left s) if s is an input line received --- at the stream port (via SP_GetLine) or --- --- (Right ms) if the stream ms is instantiated --- with at least one new message at the head choiceSPEP :: Port SP_Msg -> [msg] -> Either String [msg] choiceSPEP p ms = (prim_choiceSPEP $# p) ms prim_choiceSPEP :: Port SP_Msg -> [msg] -> Either String [msg] prim_choiceSPEP external --- Creates a new object (of type <code>State -> [msg] -> Bool</code>) --- with an initial state and a port to which messages for this object --- can be sent. --- --- @param object - an object template --- @param state - the initial state of the object --- @param port - a free variable which will be constrained to the port --- for sending messages to the object newObject :: Data msg => (state -> [msg] -> Bool) -> state -> Port msg -> Bool newObject object state port = let msgs free in openPort port msgs &> object state (map ensureNotFree (ensureSpine msgs)) --- Creates a new object (of type <code>State -> [msg] -> Bool</code>) --- with a symbolic port name to which messages for this object can be sent. --- @param object - an object template --- @param state - the initial state of the object --- @param portname - the symbolic name under which the object's port is --- accessible (any string without occurrences of '@') newNamedObject :: (state -> [_] -> Bool) -> state -> String -> IO () newNamedObject object state portname = do msgs <- openNamedPort portname doSolve (object state msgs) --- Runs a new server (of type <code>[msg] -> IO a</code>) on a named port --- to which messages can be sent. --- @param server - a server function that processes incoming messages --- @param portname - the symbolic name under which the server's port is --- accessible (any string without occurrences of '@') runNamedServer :: ([_] -> IO a) -> String -> IO a runNamedServer server portname = do msgs <- openNamedPort portname server msgs ------------------------------------------------------------------------------ -- The following predefined actions are not intended for application programs. -- They are the basis to implement ports with symbolic names -- via a name server (see library CPNS). -- (openPortOnSocket snr pnr) is an action which opens an external port -- on socket number snr with internal port number pnr and returns -- the stream of incoming messages. -- snr and pnr are allowed to be unbound: in this case they will be bound to the -- numbers associated to a free port openPortOnSocket :: Int -> Int -> IO [_] openPortOnSocket snr pnr = (prim_openPortOnSocket $! snr) $! pnr prim_openPortOnSocket :: Int -> Int -> IO [_] prim_openPortOnSocket external -- The internal function that reads a port stream lazily. basicServerLoop :: Port a -> [a] basicServerLoop external -- (connectPortAtSocket snr pnr host) is an action which returns a port that -- has been opened at <host> with socket number <snr> and port number <pnr> connectPortAtSocket :: Int -> Int -> String -> IO (Port _) connectPortAtSocket snr pnr host = ((prim_connectPortAtSocket $# snr) $# pnr) $## host prim_connectPortAtSocket :: Int -> Int -> String -> IO (Port _) prim_connectPortAtSocket external --- Waits until input is available on a given handles or a message --- in the message stream. --- Usually, the message stream comes from an external port. --- Thus, this operation implements a committed choice over receiving input --- from an IO handle or an external port. --- --- _Note that the implementation of this operation works only with --- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions --- of Sicstus-Prolog)._ --- --- @param handle - a handle for an input stream --- @param msgs - a stream of messages received via an external port --- @return (Left handle) if the handle has some data available --- (Right msgs) if the stream msgs is instantiated --- with at least one new message at the head hWaitForInputOrMsg :: Handle -> [msg] -> IO (Either Handle [msg]) hWaitForInputOrMsg handle msgs = do input <- hWaitForInputsOrMsg [handle] msgs return $ either (\_ -> Left handle) Right input --- Waits until input is available on some of the given handles or a message --- in the message stream. --- Usually, the message stream comes from an external port. --- Thus, this operation implements a committed choice over receiving input --- from IO handles or an external port. --- --- _Note that the implementation of this operation works only with --- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions --- of Sicstus-Prolog)._ --- --- @param handles - a list of handles for input streams --- @param msgs - a stream of messages received via an external port --- @return (Left i) if (handles!!i) has some data available --- (Right msgs) if the stream msgs is instantiated --- with at least one new message at the head hWaitForInputsOrMsg :: [Handle] -> [msg] -> IO (Either Int [msg]) hWaitForInputsOrMsg handles msgs = seq (normalForm (map ensureNotFree (ensureSpine handles))) (prim_hWaitForInputsOrMsg handles msgs) prim_hWaitForInputsOrMsg :: [Handle] -> [msg] -> IO (Either Int [msg]) prim_hWaitForInputsOrMsg external -- end of module Network.Ports |