1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
------------------------------------------------------------------------------
--- Library for distributed programming with ports.
--- [This paper](https://www.informatik.uni-kiel.de/~mh/papers/PPDP99.html)
--- contains a description of the basic ideas behind this library.
---
--- @author Michael Hanus
--- @version March 2021
------------------------------------------------------------------------------

module Network.Ports
  ( Port, openPort, send, doSend, openNamedPort
  , connectPort, connectPortRepeat, connectPortWait
  , ping, timeoutOnStream
  , openProcessPort, SP_Msg(..), choiceSPEP
  , newObject, newNamedObject, runNamedServer
  , hWaitForInputOrMsg, hWaitForInputsOrMsg
  ) where

import System.Process ( sleep )

import Network.CPNS   ( cpnsAlive, getPortInfo, registerPort )

--- The internal constructor for the port datatype is not visible to the user.

data Port a = InternalPort String Int Int a


--- Opens an internal port for communication.
--- @param p - a free variable which will be constrained
---            with the port messages
--- @param s - a free variable which will be instantiated
---            to the stream of incoming messages
openPort    :: Port a -> [a] -> Bool
openPort p ms = (prim_openPort $! p) $!! ms

prim_openPort :: Port a -> [a] -> Bool
prim_openPort external


--- Sends a message to a port.
send        :: a -> Port a -> Bool
send msg p = (prim_send $!! ensureNotFree msg) $# p

prim_send :: a -> Port a -> Bool
prim_send external

--- I/O action that sends a message to a port.
doSend       :: a -> Port a -> IO ()
doSend msg p = doSolve (send msg p)


--- A constrained which is satisfied after some amount of time
--- (currently only supported in TasteCurry).
--- @param n - the satisfaction time in milliseconds
after       :: Int -> Bool
after external

--- Checks whether port p is still reachable.
--- @param n - the time to wait for reachability in milliseconds
--- @param p - a port to be checked for reachability
--- @return Nothing if port p is unreachable within n milliseconds,
---         or (Just m) if port p could be contacted within m milliseconds
ping :: Int -> Port _ -> IO (Maybe Int)
ping n p = (prim_ping $# n) $# p

prim_ping :: Int -> Port _ -> IO (Maybe Int)
prim_ping external

--- Checks for instantiation of a stream within some amount of time.
--- @param n - the time to wait for instantiation in milliseconds
--- @param str - the stream to be checked for instantiation
---              (usually the stream of incoming messages at some port)
--- @return (Just str) if str is instantiated within n milliseconds,
---         or Nothing otherwise
timeoutOnStream :: Int -> [a] -> Maybe [a]
timeoutOnStream n str = (prim_timeoutOnStream $# n) str

prim_timeoutOnStream :: Int -> [a] -> Maybe [a]
prim_timeoutOnStream external


--- A "stream port" is an adaption of the port concept to model the
--- communication with bidirectional streams, i.e., a stream port is
--- a port connection to a bidirectional stream (e.g., opened by
--- openProcessPort) where the communication
--- is performed via the following stream port messages.
---
--- @cons SP_Put s     - write the argument s on the output stream
--- @cons SP_GetLine s - unify the argument s with the next text line of the
---                      input stream
--- @cons SP_GetChar c - unify the argument c with the next character of the
---                      input stream
--- @cons SP_EOF b     - unify the argument b with True if we are at the end
---                      of the input stream, otherwise with False
--- @cons SP_Close     - close the input/output streams

data SP_Msg =
     SP_Put     String -- write the argument on the output stream
   | SP_GetLine String -- unify the argument with the next text line of the
                       -- input stream
   | SP_GetChar Char   -- unify the argument with the next character of the
                       -- input stream
   | SP_EOF     Bool   -- unify the argument with True if we are at the end
                       -- of the input stream, otherwise with False
   | SP_Close          -- close the input/output streams


--- Opens a new connection to a process that executes a shell command.
--- @param cmd - the shell command to be executed
--- @return the output/input stream (represented as a stream port)
---         that is connected to the standard input/output of the process
---         performing the execution of cmd.
openProcessPort :: String -> IO (Port SP_Msg)
openProcessPort cmd = prim_openProcessPort $## cmd

prim_openProcessPort :: String -> IO (Port SP_Msg)
prim_openProcessPort external


--- Opens an external port with a symbolic name.
--- @param portname - the symbolic name under which the port is accessible
---                   (any string without occurrences of '@')
--- @return the stream of incoming messages at this port
openNamedPort :: String -> IO [_]
openNamedPort name = do
  stream <- openPortOnSocket socketnr portnr       -- open new port
  registerPort name socketnr portnr
  return stream
 where socketnr,portnr free

--- Waits for connection to an external port.
--- In contrast to <code>connectPort</code>, this action waits until
--- the external port has been registered with its symbolic name.
--- @param waittime - the time to wait before retrying (in milliseconds)
--- @param action   - I/O action to be executed before each wait cycle
--- @param retries  - number of retries before giving up (-1 = retry forever)
--- @param portname - the symbolic name of the external port
---                   (must be either of the form "name@machine" or "name"
---                    where the latter is a shorthand for "name@localhost")
--- @return Nothing (if connection is not possible within the given limits)
---         or (Just p) where p is a port with the symbolic name portname
connectPortRepeat :: Int -> IO _ -> Int -> String -> IO (Maybe (Port _))
connectPortRepeat waittime action retries nameAtHost = do
  let (name,atHost) = break (=='@') nameAtHost
      host = if atHost=="" then "localhost" else tail atHost
  -- check whether remote CPNS demon is alive:
  alive <- cpnsAlive host
  if not alive
    then tryAgain
    else do -- get remote socket/port numbers:
      (snr,pnr) <- getPortInfo name host
      if snr==0
        then tryAgain
        else connectPortAtSocket snr pnr host >>= return . Just
 where
  tryAgain = if retries==0 then return Nothing else do
    action
    sleep (ms2s waittime)
    connectPortRepeat waittime action (decr retries) nameAtHost

  ms2s n = let mn = n `div` 1000 in if mn==0 then 1 else mn

  decr n = if n<0 then n else n-1


--- Waits for connection to an external port and return the connected port.
--- This action waits (possibly forever) until the external port is
--- registered.
--- @param portname - the symbolic name of the external port
---                   (must be either of the form "name@host" or "name"
---                    where the latter is a shorthand for "name@localhost")
--- @return a port with the symbolic name portname
connectPortWait :: String -> IO (Port _)
connectPortWait nameAtHost = do
  Just port <- connectPortRepeat 1000 (return ()) (-1) nameAtHost
  return port


--- Connects to an external port. The external port must be already
--- registered, otherwise an error is reported.
--- @param portname - the symbolic name of the external port
---                   (must be either of the form "name@host" or "name"
---                    where the latter is a shorthand for "name@localhost")
--- @return a port with the symbolic name portname
connectPort   :: String -> IO (Port _)
connectPort nameAtHost = do
  let (name,atHost) = break (=='@') nameAtHost
      host = if atHost=="" then "localhost" else tail atHost
  -- get remote socket/port numbers:
  (snr,pnr) <- getPortInfo name host
  if snr==0
    then error ("connectPort: Port \""++name++"@"++host++
                "\" is not registered!")
    else return ()
  connectPortAtSocket snr pnr host


--- This function implements a committed choice over the receiving
--- of messages via a stream port and an external port.
---
--- <EM>Note that the implementation of choiceSPEP works only with
--- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions
--- of Sicstus-Prolog).</EM>
---
--- @param sp - a stream port sp
--- @param ms - a stream of messages received via an external port
--- @return (Left s) if s is an input line received
---                  at the stream port (via SP_GetLine) or
--- 
---         (Right ms) if the stream ms is instantiated
---                    with at least one new message at the head

choiceSPEP :: Port SP_Msg -> [msg] -> Either String [msg]
choiceSPEP p ms = (prim_choiceSPEP $# p) ms

prim_choiceSPEP :: Port SP_Msg -> [msg] -> Either String [msg]
prim_choiceSPEP external


--- Creates a new object (of type <code>State -> [msg] -> Bool</code>)
--- with an initial state and a port to which messages for this object
--- can be sent.
---
--- @param object - an object template
--- @param state - the initial state of the object
--- @param port - a free variable which will be constrained to the port
---               for sending messages to the object
newObject :: Data msg => (state -> [msg] -> Bool) -> state -> Port msg -> Bool
newObject object state port = let msgs free in
  openPort port msgs &> object state (map ensureNotFree (ensureSpine msgs))


--- Creates a new object (of type <code>State -> [msg] -> Bool</code>)
--- with a symbolic port name to which messages for this object can be sent.
--- @param object - an object template
--- @param state - the initial state of the object
--- @param portname - the symbolic name under which the object's port is
---                   accessible (any string without occurrences of '@')
newNamedObject :: (state -> [_] -> Bool) -> state -> String -> IO ()
newNamedObject object state portname = do
  msgs <- openNamedPort portname
  doSolve (object state msgs)

--- Runs a new server (of type <code>[msg] -> IO a</code>) on a named port
--- to which messages can be sent.
--- @param server - a server function that processes incoming messages
--- @param portname - the symbolic name under which the server's port is
---                   accessible (any string without occurrences of '@')
runNamedServer :: ([_] -> IO a) -> String -> IO a
runNamedServer server portname = do
  msgs <- openNamedPort portname
  server msgs


------------------------------------------------------------------------------

-- The following predefined actions are not intended for application programs.
-- They are the basis to implement ports with symbolic names
-- via a name server (see library CPNS).


-- (openPortOnSocket snr pnr) is an action which opens an external port
-- on socket number snr with internal port number pnr and returns
-- the stream of incoming messages.
-- snr and pnr are allowed to be unbound: in this case they will be bound to the
-- numbers associated to a free port

openPortOnSocket :: Int -> Int -> IO [_]
openPortOnSocket snr pnr = (prim_openPortOnSocket $! snr) $! pnr

prim_openPortOnSocket :: Int -> Int -> IO [_]
prim_openPortOnSocket external


-- The internal function that reads a port stream lazily.
basicServerLoop :: Port a -> [a]
basicServerLoop external


-- (connectPortAtSocket snr pnr host) is an action which returns a port that
-- has been opened at <host> with socket number <snr> and port number <pnr>

connectPortAtSocket :: Int -> Int -> String -> IO (Port _)
connectPortAtSocket snr pnr host =
  ((prim_connectPortAtSocket $# snr) $# pnr) $## host

prim_connectPortAtSocket :: Int -> Int -> String -> IO (Port _)
prim_connectPortAtSocket external

--- Waits until input is available on a given handles or a message
--- in the message stream.
--- Usually, the message stream comes from an external port.
--- Thus, this operation implements a committed choice over receiving input
--- from an IO handle or an external port.
---
--- _Note that the implementation of this operation works only with
--- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions
--- of Sicstus-Prolog)._
---
--- @param handle - a handle for an input stream
--- @param msgs   - a stream of messages received via an external port
--- @return (Left handle) if the handle has some data available
---         (Right msgs) if the stream msgs is instantiated
---                      with at least one new message at the head

hWaitForInputOrMsg :: Handle -> [msg] -> IO (Either Handle [msg])
hWaitForInputOrMsg handle msgs = do
  input <- hWaitForInputsOrMsg [handle] msgs
  return $ either (\_ -> Left handle) Right input

--- Waits until input is available on some of the given handles or a message
--- in the message stream.
--- Usually, the message stream comes from an external port.
--- Thus, this operation implements a committed choice over receiving input
--- from IO handles or an external port.
---
--- _Note that the implementation of this operation works only with
--- Sicstus-Prolog 3.8.5 or higher (due to a bug in previous versions
--- of Sicstus-Prolog)._
---
--- @param handles - a list of handles for input streams
--- @param msgs    - a stream of messages received via an external port
--- @return (Left i) if (handles!!i) has some data available
---         (Right msgs) if the stream msgs is instantiated
---                      with at least one new message at the head

hWaitForInputsOrMsg :: [Handle] -> [msg] -> IO (Either Int [msg])
hWaitForInputsOrMsg handles msgs =
  seq (normalForm (map ensureNotFree (ensureSpine handles)))
      (prim_hWaitForInputsOrMsg handles msgs)

prim_hWaitForInputsOrMsg :: [Handle] -> [msg] -> IO (Either Int [msg])
prim_hWaitForInputsOrMsg external

--  end of module Network.Ports