Logo Search packages:      
Sourcecode: r-cran-multicore version File versions  Download package

fork.c

/* multicore R package

   fork.c
   interface to system-level tools for sawning copies of the current
   process and IPC

   (C)Copyright 2008-11 Simon Urbanek

   see package DESCRIPTION for licensing terms */

#include <sys/types.h>
#include <unistd.h>

#ifndef WIN32
/* --- plain unix parte --- */
#include <sys/select.h>
#include <sys/wait.h>
#else
/* --- work arounds for Windows --- */
#include <windows.h>
#include "winfix.h"
#define read _read
#define write _write
#define close _close
#define select pipe_select
#endif
#include <signal.h>

#include <R.h>
#include <Rinternals.h>
#include <Rconfig.h> /* for AQUA */
#if HAVE_AQUA
#include <R_ext/Rdynload.h>
#endif

#ifndef FILE_LOG
/* use printf instead of Rprintf for debugging to avoid forked console interactions */
#define Dprintf printf
#else
/* logging into a file */
#include <stdarg.h>
void Dprintf(char *format, ...) {
      va_list (args);
      va_start (args, format);
      FILE *f = fopen("mc_debug.txt", "a");
      if (f) {
            fprintf(f, "%d> ", getpid());
            vfprintf(f, format, args);
            fclose(f);
      }
      va_end (args);
}
#endif

00055 typedef struct child_info {
      pid_t pid;
      int pfd, sifd;
#ifdef WIN32
      HANDLE mutex; /* mutex for releasing a child */
#endif
      struct child_info *next;
} child_info_t;

static child_info_t *children;

static int master_fd = -1;
static int is_master = 1;

static int rm_child_(int pid) {
      child_info_t *ci = children, *prev = 0;
#ifdef MC_DEBUG
      Dprintf("removing child %d\n", pid);
#endif
      while (ci) {
            if (ci->pid == pid) {
#ifdef WIN32
                  HANDLE mutex = ci->mutex;
#endif
                  /* make sure we close all descriptors */
                  if (ci->pfd > 0) { close(ci->pfd); ci->pfd = -1; }
                  if (ci->sifd > 0) { close(ci->sifd); ci->sifd = -1; }
                  /* now remove it from the list */
                  if (prev) prev->next = ci->next;
                  else children = ci->next;
                  free(ci);
#ifdef WIN32
                  ReleaseMutex(mutex);
                  CloseHandle(mutex);
                  /* just in case doesn't really work ... */
                  TerminateProcess((HANDLE) pid, 0);
#else
                  kill(pid, SIGUSR1); /* send USR1 to the child to make sure it exits */
#endif
                  return 1;
            }
            prev = ci;
            ci = ci->next;
      }
#ifdef MC_DEBUG
      Dprintf("WARNING: child %d was to be removed but it doesn't exist\n", pid);
#endif
      return 0;
}

#ifndef STDIN_FILENO
#define STDIN_FILENO 0
#endif
#ifndef STDOUT_FILENO
#define STDOUT_FILENO 1
#endif
#ifndef STDERR_FILENO
#define STDERR_FILENO 2
#endif

static int child_can_exit = 0, child_exit_status = -1;

#ifndef WIN32
static void child_sig_handler(int sig) {
      if (sig == SIGUSR1) {
#ifdef MC_DEBUG
            Dprintf("child process %d got SIGUSR1; child_exit_status=%d\n", getpid(), child_exit_status);
#endif
            child_can_exit = 1;
            if (child_exit_status >= 0)
                  exit(child_exit_status);
      }
}
#else
HANDLE child_release_mutex;
#endif

#if HAVE_AQUA
/* from aqua.c */
extern void (*ptr_R_ProcessEvents)(void);

static int find_quartz_symbols = 1;
void (*QuartzCocoa_InhibitEventLoop)(int);
typedef void (*QuartzCocoa_InhibitEventLoop_t)(int);

/* unfortunately Rdynload.h forgets to declare it so the API is broken - we need to fix it */
struct Rf_RegisteredNativeSymbol {
  NativeSymbolType type;
  void *fn, *dll;
};

/* check whether Quartz is loaded (if not, returns -1) and if so returns 1 is QuartzCocoa_InhibitEventLoop has been found 0 otherwise */
static int getQuartzSymbols() {
    if (find_quartz_symbols) {
      R_RegisteredNativeSymbol symbol = {R_ANY_SYM, NULL, NULL};
      if (R_FindSymbol("getQuartzAPI", "", &symbol)) { /* is Quartz loaded? if not, we have nothing to worry about */
          /* unfortunately R disables dynamic lookup in grDevices so we need to get at it manually
             this means that we need to get the corresponding DllInfo to enable it, then look up the symbol and disable it again */
          SEXP getNativeSymbolInfo = install("getNativeSymbolInfo");
          SEXP nsi = eval(lang2(getNativeSymbolInfo, mkString("getQuartzAPI")), R_GlobalEnv);
          /* get nsi[[3]][[2]] which should be the path (we verify every step) */
          if (TYPEOF(nsi) == VECSXP && LENGTH(nsi) > 2) {
            SEXP pkg = VECTOR_ELT(nsi, 2);
            if (TYPEOF(pkg) == VECSXP && LENGTH(pkg) > 1) {
                SEXP dpath = VECTOR_ELT(pkg, 1);
                if (TYPEOF(dpath) == STRSXP && LENGTH(dpath) > 0) {
                  /* this is technically unnecessary since nsi actually contains
                     the EXTPTR holding the DllInfo, gut we'll play it safe here */
                  DllInfo *dll = R_getDllInfo(CHAR(STRING_ELT(dpath, 0)));
                  if (dll) {
                      struct Rf_RegisteredNativeSymbol { NativeSymbolType type; void *fn, *dll; } symbol = { R_ANY_SYM, NULL, NULL };
                      R_useDynamicSymbols(dll, TRUE); /* turn on dynamic symbols */
                      /* it would be faster to use R_dlsym since we already have DllInfo but that is hidden so let's waste more cycles.. */
                      QuartzCocoa_InhibitEventLoop = (QuartzCocoa_InhibitEventLoop_t) R_FindSymbol("QuartzCocoa_InhibitEventLoop", "grDevices", (R_RegisteredNativeSymbol*) &symbol);
                      R_useDynamicSymbols(dll, FALSE); /* turn them off - we got what we want */
                  }
                }
            }
          }
          /* do not try again since we did all the work */
          find_quartz_symbols = 0;
      }
    }
    return find_quartz_symbols ? -1 : ((QuartzCocoa_InhibitEventLoop) ? 1 : 0);
}
#else
static int getQuartzSymbols() { return -1; }
#endif

SEXP mc_fork() {
      int pipefd[2];
      int sipfd[2];
      pid_t pid;
      SEXP res = allocVector(INTSXP, 3);
      int *res_i = INTEGER(res);
      if (pipe(pipefd)) error("Unable to create a pipe.");
      if (pipe(sipfd)) {
            close(pipefd[0]); close(pipefd[1]);
            error("Unable to create a pipe.");
      }
#ifdef WIN32
      {
            SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE };
            child_release_mutex = CreateMutex(&sa, TRUE, NULL);
      }
#endif
      getQuartzSymbols(); /* initialize Quartz symbols if needed (noop on non-Aqua systems) */

      pid = fork();
      if (pid == -1) {
            perror("fork");
            close(pipefd[0]); close(pipefd[1]);
            close(sipfd[0]); close(sipfd[1]);
#ifdef WIN32
            CloseHandle(child_release_mutex);
#endif
            error("Unable to fork.");
      }
      res_i[0] = (int) pid;
      if (pid == 0) { /* child */
            close(pipefd[0]); /* close read end */
            master_fd = res_i[1] = pipefd[1];
            is_master = 0;
#if HAVE_AQUA
            ptr_R_ProcessEvents = NULL; /* disable ProcessEvent since we can't call CF from now on */
#endif
            /* re-map stdin */
            dup2(sipfd[0], STDIN_FILENO);
            close(sipfd[0]);
            /* master uses USR1 to signal that the child process can terminate */
            child_exit_status = -1;
            child_can_exit = 0;
#ifndef WIN32
            signal(SIGUSR1, child_sig_handler);
#endif
#if HAVE_AQUA
            /* Quartz runs the event loop so we need to stop it if we can */
            if (QuartzCocoa_InhibitEventLoop)
                QuartzCocoa_InhibitEventLoop(1);
#endif
#ifdef MC_DEBUG
            Dprintf("child process %d started\n", getpid());
#endif
      } else { /* master process */
            child_info_t *ci;
            close(pipefd[1]); /* close write end of the data pipe */
            close(sipfd[0]);  /* close read end of the child-stdin pipe */
            res_i[1] = pipefd[0];
            res_i[2] = sipfd[1];
#ifdef MC_DEBUG
            Dprintf("parent registers new child %d\n", pid);
#endif
            /* register the new child and its pipes */
            ci = (child_info_t*) malloc(sizeof(child_info_t));
            if (!ci) error("Memory allocation error.");
            ci->pid = pid;
            ci->pfd = pipefd[0];
            ci->sifd= sipfd[1];
#ifdef WIN32
            ci->mutex = child_release_mutex;
            /* since we're now forked, the pipes (and mutex) should not be inherited by other children (note that this may mess up FD handling but children should not use those anyway) */
            SetHandleInformation((HANDLE)_get_osfhandle(ci->pfd), HANDLE_FLAG_INHERIT, 0);
            SetHandleInformation((HANDLE)_get_osfhandle(ci->sifd), HANDLE_FLAG_INHERIT, 0);
            SetHandleInformation(child_release_mutex, HANDLE_FLAG_INHERIT, 0);
#if 0
            /* also Windows doesn't support concurrent stdout/err, so we can close them */
            close(STDOUT_FILENO);
            close(STDERR_FILENO);
            /* ok, the next one is insane - we abuse R_SetWin32 to clear out (possibly suicidal) callbacks */
            structRstart s;
            s.rhome = R_Home;
            s.home = getenv("HOME");
            s.CharacterMode = RTerm;
            s.ReadConsole 
            s.WriteConsole
            s.WriteConsoleEx
            s.CallBack
            s.ShowMessage
            s.YesNoCancel
            s.Busy
            s.NoRenviron = 1;
            R_SetWin32(&s);
#endif
            
#endif
            ci->next = children;
            children = ci;
      }
      return res;
}

SEXP close_stdout() {
      close(STDOUT_FILENO);
      return R_NilValue;
}

SEXP close_stderr() {
      close(STDERR_FILENO);
      return R_NilValue;
}

SEXP close_fds(SEXP sFDS) {
      int *fd, fds, i = 0;
      if (TYPEOF(sFDS) != INTSXP) error("descriptors must be integers");
      fds = LENGTH(sFDS);
      fd = INTEGER(sFDS);
      while (i < fds) close(fd[i++]);
      return ScalarLogical(1);
}

SEXP send_master(SEXP what) {
      unsigned char *b;
      unsigned int len = 0, i = 0;
      if (is_master) error("only children can send data to the master process");
      if (master_fd == -1) error("there is no pipe to the master process");
      if (TYPEOF(what) != RAWSXP) error("content to send must be RAW, use serialize if needed");
      len = LENGTH(what);
      b = RAW(what);
#ifdef MC_DEBUG
      Dprintf("child %d: send_master (%d bytes)\n", getpid(), len);
#endif
      if (write(master_fd, &len, sizeof(len)) != sizeof(len)) {
            close(master_fd);
            master_fd = -1;
            error("write error, closing pipe to the master");
      }
      while (i < len) {
            int n = write(master_fd, b + i, len - i);
            if (n < 1) {
                  close(master_fd);
                  master_fd = -1;
                  error("write error, closing pipe to the master");
            }
            i += n;
      }
      return ScalarLogical(1);
}

SEXP send_child_stdin(SEXP sPid, SEXP what) {
      unsigned char *b;
      unsigned int len = 0, i = 0, fd;
      int pid = asInteger(sPid);
      if (!is_master) error("only master (parent) process can send data to a child process");
      if (TYPEOF(what) != RAWSXP) error("what must be a raw vector");
      child_info_t *ci = children;
      while (ci) {
            if (ci->pid == pid) break;
            ci = ci -> next;
      }
      if (!ci) error("child %d doesn't exist", pid);
      len = LENGTH(what);
      b = RAW(what);
      fd = ci -> sifd;
      while (i < len) {
            int n = write(fd, b + i, len - i);
            if (n < 1)
                  error("write error");
            i += n;
      }
      return ScalarLogical(1);
}

SEXP select_children(SEXP sTimeout, SEXP sWhich) {
      int maxfd = 0, sr, zombies = 0;
      unsigned int wlen = 0, wcount = 0;
      SEXP res;
      int *res_i, *which = 0;
      child_info_t *ci = children;
      fd_set fs;
      struct timeval tv = { 0, 0 }, *tvp = &tv;
      if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
            double tov = asReal(sTimeout);
            if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
            else {
                  tv.tv_sec = (int) tov;
                  tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
            }
      }
      if (TYPEOF(sWhich) == INTSXP && LENGTH(sWhich)) {
            which = INTEGER(sWhich);
            wlen = LENGTH(sWhich);
      }
#ifndef WIN32
      { int wstat; while (waitpid(-1, &wstat, WNOHANG) > 0) {}; } /* check for zombies */
#endif
      FD_ZERO(&fs);
      while (ci && ci->pid) {
            if (ci->pfd == -1) zombies++;
            if (ci->pfd > maxfd) maxfd = ci->pfd;
            if (ci->pfd > 0) {
                  if (which) { /* check for the FD only if it's on the list */
                        unsigned int k = 0;
                        while (k < wlen) if (which[k++] == ci->pid) { FD_SET(ci->pfd, &fs); wcount++; break; }
                  } else
                        FD_SET(ci->pfd, &fs);
            }
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("select_children: maxfd=%d, wlen=%d, wcount=%d, zombies=%d, timeout=%d:%d\n", maxfd, wlen, wcount, zombies, (int)tv.tv_sec, (int)tv.tv_usec);
#endif
      if (zombies) { /* oops, this should never really hapen - it did while we had a bug in rm_child_ but hopefully not anymore */
            while (zombies) { /* this is rather more complicated than it should be if we used pointers to delete, but well ... */
                  ci = children;
                  while (ci) {
                        if (ci->pfd == -1) {
#ifdef MC_DEBUG
                              Dprintf("detected zombie: pid=%d, pfd=%d, sifd=%d\n", ci->pid, ci->pfd, ci->sifd);
#endif
                              rm_child_(ci->pid);
                              zombies--;
                              break;
                        }
                        ci = ci->next;
                  }
                  if (!ci) break;
            }
      }
      if (maxfd == 0 || (wlen && !wcount)) return R_NilValue; /* NULL signifies no children to tend to */
      sr = select(maxfd + 1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
      Dprintf("  sr = %d\n", sr);
#endif
      if (sr < 0) {
            perror("select");
            return ScalarLogical(0); /* FALSE on select error */
      }
      if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
      ci = children;
      maxfd = 0;
      while (ci && ci->pid) { /* pass 1 - count the FDs (in theory not necessary since that's what select should have returned)  */
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) maxfd++;
            ci = ci -> next;
      }
      ci = children;
#ifdef MC_DEBUG
      Dprintf(" - read select %d children: ", maxfd);
#endif
      res = allocVector(INTSXP, maxfd);
      res_i = INTEGER(res);
      while (ci && ci->pid) { /* pass 2 - fill the array */
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) (res_i++)[0] = ci->pid;
#ifdef MC_DEBUG
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) Dprintf("%d ", ci->pid);
#endif
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("\n");
#endif
      return res;
}

static SEXP read_child_ci(child_info_t *ci) {
      unsigned int len = 0;
      int fd = ci->pfd;
      int n = read(fd, &len, sizeof(len));
#ifdef MC_DEBUG
      Dprintf(" read_child_ci(%d) - read length returned %d\n", ci->pid, n);
#endif
      if (n != sizeof(len) || len == 0) { /* error or child is exiting */
            int pid = ci->pid;
            close(fd);
            ci->pfd = -1;
            rm_child_(pid);
            return ScalarInteger(pid);
      } else {
            SEXP rv = allocVector(RAWSXP, len);
            unsigned char *rvb = RAW(rv);
            unsigned int i = 0;
            while (i < len) {
                  n = read(fd, rvb + i, len - i);
#ifdef MC_DEBUG
                  Dprintf(" read_child_ci(%d) - read %d at %d returned %d\n", ci->pid, len-i, i, n);
#endif
                  if (n < 1) {
                        int pid = ci->pid;
                        close(fd);
                        ci->pfd = -1;
                        rm_child_(pid);
                        return ScalarInteger(pid);
                  }
                  i += n;
            }
            PROTECT(rv);
            {
                  SEXP pa = allocVector(INTSXP, 1);
                  INTEGER(pa)[0] = ci->pid;
                  setAttrib(rv, install("pid"), pa);
            }
            UNPROTECT(1);
            return rv;
      }
}

SEXP read_child(SEXP sPid) {
      int pid = asInteger(sPid);
      child_info_t *ci = children;
      while (ci) {
            if (ci->pid == pid) break;
            ci = ci->next;
      }
#ifdef MC_DEBUG
      if (!ci) Dprintf("read_child(%d) - pid is not in the list of children\n", pid);
#endif
      if (!ci) return R_NilValue; /* if the child doesn't exist anymore, returns NULL */
      return read_child_ci(ci);     
}

SEXP read_children(SEXP sTimeout) {
      int maxfd = 0, sr;
      child_info_t *ci = children;
      fd_set fs;
      struct timeval tv = { 0, 0 }, *tvp = &tv;
      if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
            double tov = asReal(sTimeout);
            if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
            else {
                  tv.tv_sec = (int) tov;
                  tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
            }
      }
#ifndef WIN32
      { int wstat; while (waitpid(-1, &wstat, WNOHANG) > 0) {}; } /* check for zombies */
#endif
      FD_ZERO(&fs);
      while (ci && ci->pid) {
            if (ci->pfd > maxfd) maxfd = ci->pfd;
            if (ci->pfd > 0) FD_SET(ci->pfd, &fs);
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("read_children: maxfd=%d, timeout=%d:%d\n", maxfd, (int)tv.tv_sec, (int)tv.tv_usec);
#endif
      if (maxfd == 0) return R_NilValue; /* NULL signifies no children to tend to */
      sr = select(maxfd+1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
      Dprintf("sr = %d\n", sr);
#endif
      if (sr < 0) {
            perror("select");
            return ScalarLogical(0); /* FALSE on select error */
      }
      if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
      ci = children;
      while (ci && ci->pid) {
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) break;
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("set ci=%p (%d, %d)\n", (void*) ci, ci?ci->pid:0, ci?ci->pfd:0);
#endif
      /* this should never occur really - select signalled a read handle
         but none of the handles is set - let's treat it as a timeout */
      if (!ci) return ScalarLogical(1);
      else
            return read_child_ci(ci);
      /* we should never land here */
      return R_NilValue;
}

SEXP rm_child(SEXP sPid) {
      int pid = asInteger(sPid);
      return ScalarLogical(rm_child_(pid));
}

SEXP mc_children() {
      unsigned int count = 0;
      SEXP res;
      int *pids;
      child_info_t *ci = children;
      while (ci && ci->pid > 0) {
            count++;
            ci = ci->next;
      }
      res = allocVector(INTSXP, count);
      if (count) {
            pids = INTEGER(res);
            ci = children;
            while (ci && ci->pid > 0) {
                  (pids++)[0] = ci->pid;
                  ci = ci->next;
            }
      }
      return res;
}

SEXP mc_fds(SEXP sFdi) {
      int fdi = asInteger(sFdi);
      unsigned int count = 0;
      SEXP res;
      child_info_t *ci = children;
      while (ci && ci->pid > 0) {
            count++;
            ci = ci->next;
      }
      res = allocVector(INTSXP, count);
      if (count) {
            int *fds = INTEGER(res);
            ci = children;
            while (ci && ci->pid > 0) {
                  (fds++)[0] = (fdi == 0) ? ci->pfd : ci->sifd;
                  ci = ci->next;
            }
      }
      return res;
}


SEXP mc_master_fd() {
      return ScalarInteger(master_fd);
}

SEXP mc_is_child() {
      return ScalarLogical(is_master?0:1);
}

SEXP mc_kill(SEXP sPid, SEXP sSig) {
#ifdef WIN32
      error("signals are not supported on Windows");
      return R_NilValue;
#else
      int pid = asInteger(sPid);
      int sig = asInteger(sSig);
      if (kill((pid_t) pid, sig))
            error("Kill failed.");
      return ScalarLogical(1);
#endif
}

SEXP mc_exit(SEXP sRes) {
      int res = asInteger(sRes);
#ifdef MC_DEBUG
      Dprintf("child %d: exit called\n", getpid());
#endif
      if (is_master) error("exit can only be used in a child process");
      if (master_fd != -1) { /* send 0 to signify that we're leaving */
            unsigned int len = 0;
            write(master_fd, &len, sizeof(len));
            /* make sure the pipe is closed before we enter any waiting */
            close(master_fd);
            master_fd = -1;
      }
#ifdef WIN32
      /* master locks the mutex until it's ready to collect the result */
      WaitForSingleObject(child_release_mutex, INFINITE);
#else
      if (!child_can_exit) {
#ifdef MC_DEBUG
            Dprintf("child %d is waiting for permission to exit\n", getpid());
#endif
            while (!child_can_exit) {
                  sleep(1);
            }
      }
#endif
            
#ifdef MC_DEBUG
      Dprintf("child %d: exiting\n", getpid());
#endif
      exit(res);
      error("exit failed");
      return R_NilValue;
}

/* this is not really necessary, since from R you can simply use
   is.loaded("QuartzCocoa_InhibitEventLoop") and it will be TRUE if we
   got to it. */
SEXP mc_can_disable_quartz() {
      return Rf_ScalarLogical(getQuartzSymbols());
}

Generated by  Doxygen 1.6.0   Back to index