vignettes/internals.Rmd
internals.Rmd
This vignette is not needed for using the async package. It
may be useful for developers of async methods, and certainly for people
working on the async package itself. Make sure you read the README and
also the manual page of ?deferred
before reading this
vignette.
If you think that this document conflicts the source code, please report an issue in the async issue tracker.
The async event loop is modeled after the libuv event loop. It is possible that we’ll switch to the libuv event loop later.
The essential feature of the event loop is that it is
polling for all I/O, in a single processx::poll()
call. This allows the quickest possible processing of I/O, as
poll()
returns as soon as an event is available for
processing.
Ideally, a single poll()
call should return all
available events, and this is indeed the case for Unix, but currently
not for Windows. To improve this on Windows, we would need to
use GetQueuedCompletionStatusEx()
in processx, which can
remove multiple packets from the IOCP. HTTP is already handled properly
on Windows as well, because we use select()
, which returns
all events.
processx::poll()
is able to poll for various types of
I/O: * processx processes (not used in async currently), * processx
connections, this is used in async to poll for all processes,
i.e. run_process()
, run_r_process()
and
call_function()
. * curl file descriptors, this is used in
async to poll for HTTP.
processx::poll()
is interruptible on all platforms. THis
is implemented by polling in a loop, for a short amount of time only
(~200ms) and then checking for interrupts.
Processes are polled using their “poll connections”. This is an extra
connection in addition to standard output and error, and for
run_process()
and run_r_process()
it is used
to poll for the termination of the process. For the worker pool,
i.e. call_function()
, it is used to communicate back to the
main process, so it can be used to poll for the completion of the remote
function call.
On Unix, polling processx connections simply uses file descriptor and
the poll(2)
system call.
On Windows, polling processx connections uses overlapped I/O and IOCPs. Since on Windows you cannot poll without reading, all connections are also buffered.
Implementing HTTP polling is significantly more difficult than only polling processx connections. We list the most significant issues and their workarounds here.
The curl package implements a HTTP client, it uses libcurl
internally. In async, we do not intend to re-implement a HTTP client,
but we just want to use curl. We added the
curl::multi_fdset()
function to curl, this returns the
socket file descriptiors to poll for curl’s HTTP connections and also
the timeout value that curl prefers. We poll these file descriptors with
processx::poll()
and if any of them have data, we call
curl::multi_run()
. We also use the returned timeout value
as a maximum limit for the poll, unless we also have lower limits for
other I/O or timers.
HTTP queries usually involve DNS resolution. This is done
automatically by libcurl, but we need to handle it somewhat specially,
because libcurl does not report a file descriptor for it. E.g. if a curl
multi handle has a single pending HTTP query which is in the DNS
resolution phase, then curl::multi_fdset()
returns zero
file descriptors. But we still need to call
curl::multi_run()
to allow the DNS resolution to complete,
and the actually HTTP to start. To work around this, we always call
curl::multi_run()
if some curl handles are in the DNS
resolution phase and we used curl’s returned timeout value for the
poll.
HTTP timeouts also have to be handled specially, because we need to
call curl::multi_run()
even if there is no data on the curl
sockets. Similarly to the DNS resolution workaround, we always call
curl::multi_run()
if we used curl’s timeout for the poll.
This makes sure that it is called no later than requrested by curl
itself, and curl can take care of the HTTP timeouts.
Polling a mix of processx connections (IOCP) and curl sockets is not
trivial on Windows. We cannot add the sockets to processx’s IOCP,
because that would cause complications if libcurl also started to use
IOCP for them, and also with the buffering. The right approach is to use
select()
for these sockets, in parallel with the IOCP poll.
So we do these in two threads.
The main thread runs select()
and the background thread
polls the IOCP. If there is data on either threads, they must wake up
the other thread. For this we add an extra socket on localhost to the FD
set of curl sockets. This is the “notification socket”, and the IOCP
polling thread writes data to this socket as soon as its poll has
finished. Similarly, the main thread puts a special event into the IOCP,
as soon as the select()
has finished.
To avoid race conditions, neither thread can assume that it woke up
the other, even if it has sent a wake up event. We need to keep this in
mind when writing the Windows poll code in processx. In particular * the
select()
thread may or may not return “real” results,
independently of whether the notification socket is set. * the IOCP
thread may or may not return a real result. A “non-real” result can be
recognized from the special key, and a zero OVERLAPPED
pointer.
Nested event loops are in general fine, and they “just work”, because we are polling the I/O of the active event loop only.
There are some complications with the worker pool, however. In particular, worker pool tasks might terminate when their event loop is inactive. See the section about the worker pool for more about this.
Another potential issue with nested event loops is that HTTP requests might time out and HTTP connections might disconnect while their event loops are inactive. There is not much we can do about this, without running the HTTP on a background thread for example, but that is difficult as well as we cannot call R functions from the background thread, so we would need to use (lib)curl’s C multi handles directly, which is less than ideal.
The worker pool is a set of background processes, controlled via
callr::r_session
objects. call_function()
queues a function call to run on the worker pool, and the async event
loop schedules the tasks to the queue. The workers can be polled via
their poll connections.
There is a single worker pool for an R process. If multiple event
loops are active, then they share the worker pool. The reason for this
is that it would be too costly to start a worker pool when creating an
event loop (i.e. for a synchronise()
call). In general it
seems that the life time of the worker pool is typically longer than the
life time of an event loop.
Sharing the worker pool causes some complications, which we need to handle in the event loop. In partucular, a worker pool task might finish while its event loop is not active. In this case, we free the worker, and do not remove the task’s results from the worker pool. When its event loop will be active again, it will check for any tasks that have been completed while the event loop was inactive.
Moreover, when choosing a new task to run on the worker pool, we may choose a task that does not belong to the active event loop. This is fine, our only restriction is that the task must be removed from the worker pool when its event loop is active.