Logo Search packages:      
Sourcecode: r-cran-multicore version File versions  Download package

fork.c

/* multicore R package

   fork.c
   interface to system-level tools for sawning copies of the current
   process and IPC

   (C)Copyright 2008 Simon Urbanek

   see package DESCRIPTION for licensing terms */

#include <sys/types.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/wait.h>
#include <signal.h>

#include <R.h>
#include <Rinternals.h>

/* use printf instead of Rprintf for debugging to avoid forked console interactions */
#define Dprintf printf

00023 typedef struct child_info {
      pid_t pid;
      int pfd, sifd;
      struct child_info *next;
} child_info_t;

static child_info_t *children;

static int master_fd = -1;
static int is_master = 1;

static int rm_child_(int pid) {
      child_info_t *ci = children, *prev = 0;
#ifdef MC_DEBUG
      Dprintf("removing child %d\n", pid);
#endif
      while (ci) {
            if (ci->pid == pid) {
                  /* make sure we close all descriptors */
                  if (ci->pfd > 0) { close(ci->pfd); ci->pfd = -1; }
                  if (ci->sifd > 0) { close(ci->sifd); ci->sifd = -1; }
                  /* now remove it from the list */
                  if (prev) prev->next = ci->next;
                  else children = ci->next;
                  free(ci);
                  kill(pid, SIGUSR1); /* send USR1 to the child to make sure it exits */
                  return 1;
            }
            prev = ci;
            ci = ci->next;
      }
#ifdef MC_DEBUG
      Dprintf("WARNING: child %d was to be removed but it doesn't exist\n", pid);
#endif
      return 0;
}

#ifndef STDIN_FILENO
#define STDIN_FILENO 0
#endif
#ifndef STDOUT_FILENO
#define STDOUT_FILENO 1
#endif
#ifndef STDERR_FILENO
#define STDERR_FILENO 2
#endif

static int child_can_exit = 0, child_exit_status = -1;

static void child_sig_handler(int sig) {
      if (sig == SIGUSR1) {
#ifdef MC_DEBUG
            Dprintf("child process %d got SIGUSR1; child_exit_status=%d\n", getpid(), child_exit_status);
#endif
            child_can_exit = 1;
            if (child_exit_status >= 0)
                  exit(child_exit_status);
      }
}

SEXP mc_fork() {
      int pipefd[2];
      int sipfd[2];
      pid_t pid;
      SEXP res = allocVector(INTSXP, 3);
      int *res_i = INTEGER(res);
      if (pipe(pipefd)) error("Unable to create a pipe.");
      if (pipe(sipfd)) {
            close(pipefd[0]); close(pipefd[1]);
            error("Unable to create a pipe.");
      }
      pid = fork();
      if (pid == -1) {
            close(pipefd[0]); close(pipefd[1]);
            close(sipfd[0]); close(sipfd[1]);
            error("Unable to fork.");
      }
      res_i[0] = (int) pid;
      if (pid == 0) { /* child */
            close(pipefd[0]); /* close read end */
            master_fd = res_i[1] = pipefd[1];
            is_master = 0;
            /* re-map stdin */
            dup2(sipfd[0], STDIN_FILENO);
            close(sipfd[0]);
            /* master uses USR1 to signal that the child process can terminate */
            child_exit_status = -1;
            child_can_exit = 0;
            signal(SIGUSR1, child_sig_handler);
#ifdef MC_DEBUG
            Dprintf("child process %d started\n", getpid());
#endif
            
      } else { /* master process */
            child_info_t *ci;
            close(pipefd[1]); /* close write end of the data pipe */
            close(sipfd[0]);  /* close read end of the child-stdin pipe */
            res_i[1] = pipefd[0];
            res_i[2] = sipfd[1];
#ifdef MC_DEBUG
            Dprintf("parent registers new child %d\n", pid);
#endif
            /* register the new child and its pipes */
            ci = (child_info_t*) malloc(sizeof(child_info_t));
            if (!ci) error("Memory allocation error.");
            ci->pid = pid;
            ci->pfd = pipefd[0];
            ci->sifd= sipfd[1];
            ci->next = children;
            children = ci;
      }
      return res;
}

SEXP close_stdout() {
      close(STDOUT_FILENO);
      return R_NilValue;
}

SEXP close_stderr() {
      close(STDERR_FILENO);
      return R_NilValue;
}

SEXP close_fds(SEXP sFDS) {
      int *fd, fds, i = 0;
      if (TYPEOF(sFDS) != INTSXP) error("descriptors must be integers");
      fds = LENGTH(sFDS);
      fd = INTEGER(sFDS);
      while (i < fds) close(fd[i++]);
      return ScalarLogical(1);
}

SEXP send_master(SEXP what) {
      unsigned char *b;
      unsigned int len = 0, i = 0;
      if (is_master) error("only children can send data to the master process");
      if (master_fd == -1) error("there is no pipe to the master process");
      if (TYPEOF(what) != RAWSXP) error("content to send must be RAW, use serialize if needed");
      len = LENGTH(what);
      b = RAW(what);
#ifdef MC_DEBUG
      Dprintf("child %d: send_master (%d bytes)\n", getpid(), len);
#endif
      if (write(master_fd, &len, sizeof(len)) != sizeof(len)) {
            close(master_fd);
            master_fd = -1;
            error("write error, closing pipe to the master");
      }
      while (i < len) {
            int n = write(master_fd, b + i, len - i);
            if (n < 1) {
                  close(master_fd);
                  master_fd = -1;
                  error("write error, closing pipe to the master");
            }
            i += n;
      }
      return ScalarLogical(1);
}

SEXP send_child_stdin(SEXP sPid, SEXP what) {
      unsigned char *b;
      unsigned int len = 0, i = 0, fd;
      int pid = asInteger(sPid);
      if (!is_master) error("only master (parent) process can send data to a child process");
      if (TYPEOF(what) != RAWSXP) error("what must be a raw vector");
      child_info_t *ci = children;
      while (ci) {
            if (ci->pid == pid) break;
            ci = ci -> next;
      }
      if (!ci) error("child %d doesn't exist", pid);
      len = LENGTH(what);
      b = RAW(what);
      fd = ci -> sifd;
      while (i < len) {
            int n = write(fd, b + i, len - i);
            if (n < 1)
                  error("write error");
            i += n;
      }
      return ScalarLogical(1);
}

SEXP select_children(SEXP sTimeout, SEXP sWhich) {
      int maxfd = 0, sr, wstat, zombies = 0;
      unsigned int wlen = 0, wcount = 0;
      SEXP res;
      int *res_i, *which = 0;
      child_info_t *ci = children;
      fd_set fs;
      struct timeval tv = { 0, 0 }, *tvp = &tv;
      if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
            double tov = asReal(sTimeout);
            if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
            else {
                  tv.tv_sec = (int) tov;
                  tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
            }
      }
      if (TYPEOF(sWhich) == INTSXP && LENGTH(sWhich)) {
            which = INTEGER(sWhich);
            wlen = LENGTH(sWhich);
      }
      while (waitpid(-1, &wstat, WNOHANG) > 0) {}; /* check for zombies */
      FD_ZERO(&fs);
      while (ci && ci->pid) {
            if (ci->pfd == -1) zombies++;
            if (ci->pfd > maxfd) maxfd = ci->pfd;
            if (ci->pfd > 0) {
                  if (which) { /* check for the FD only if it's on the list */
                        unsigned int k = 0;
                        while (k < wlen) if (which[k++] == ci->pid) { FD_SET(ci->pfd, &fs); wcount++; break; }
                  } else
                        FD_SET(ci->pfd, &fs);
            }
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("select_children: maxfd=%d, wlen=%d, wcount=%d, zombies=%d, timeout=%d:%d\n", maxfd, wlen, wcount, zombies, (int)tv.tv_sec, (int)tv.tv_usec);
#endif
      if (zombies) { /* oops, this should never really hapen - it did while we had a bug in rm_child_ but hopefully not anymore */
            while (zombies) { /* this is rather more complicated than it should be if we used pointers to delete, but well ... */
                  ci = children;
                  while (ci) {
                        if (ci->pfd == -1) {
#ifdef MC_DEBUG
                              Dprintf("detected zombie: pid=%d, pfd=%d, sifd=%d\n", ci->pid, ci->pfd, ci->sifd);
#endif
                              rm_child_(ci->pid);
                              zombies--;
                              break;
                        }
                        ci = ci->next;
                  }
                  if (!ci) break;
            }
      }
      if (maxfd == 0 || (wlen && !wcount)) return R_NilValue; /* NULL signifies no children to tend to */
      sr = select(maxfd + 1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
      Dprintf("  sr = %d\n", sr);
#endif
      if (sr < 0) {
            perror("select");
            return ScalarLogical(0); /* FALSE on select error */
      }
      if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
      ci = children;
      maxfd = 0;
      while (ci && ci->pid) { /* pass 1 - count the FDs (in theory not necessary since that's what select should have returned)  */
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) maxfd++;
            ci = ci -> next;
      }
      ci = children;
#ifdef MC_DEBUG
      Dprintf(" - read select %d children: ", maxfd);
#endif
      res = allocVector(INTSXP, maxfd);
      res_i = INTEGER(res);
      while (ci && ci->pid) { /* pass 2 - fill the array */
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) (res_i++)[0] = ci->pid;
#ifdef MC_DEBUG
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) Dprintf("%d ", ci->pid);
#endif
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("\n");
#endif
      return res;
}

static SEXP read_child_ci(child_info_t *ci) {
      unsigned int len = 0;
      int fd = ci->pfd;
      int n = read(fd, &len, sizeof(len));
#ifdef MC_DEBUG
      Dprintf(" read_child_ci(%d) - read length returned %d\n", ci->pid, n);
#endif
      if (n != sizeof(len) || len == 0) { /* error or child is exiting */
            int pid = ci->pid;
            close(fd);
            ci->pfd = -1;
            rm_child_(pid);
            return ScalarInteger(pid);
      } else {
            SEXP rv = allocVector(RAWSXP, len);
            unsigned char *rvb = RAW(rv);
            unsigned int i = 0;
            while (i < len) {
                  n = read(fd, rvb + i, len - i);
#ifdef MC_DEBUG
                  Dprintf(" read_child_ci(%d) - read %d at %d returned %d\n", ci->pid, len-i, i, n);
#endif
                  if (n < 1) {
                        int pid = ci->pid;
                        close(fd);
                        ci->pfd = -1;
                        rm_child_(pid);
                        return ScalarInteger(pid);
                  }
                  i += n;
            }
            PROTECT(rv);
            {
                  SEXP pa = allocVector(INTSXP, 1);
                  INTEGER(pa)[0] = ci->pid;
                  setAttrib(rv, install("pid"), pa);
            }
            UNPROTECT(1);
            return rv;
      }
}

SEXP read_child(SEXP sPid) {
      int pid = asInteger(sPid);
      child_info_t *ci = children;
      while (ci) {
            if (ci->pid == pid) break;
            ci = ci->next;
      }
#ifdef MC_DEBUG
      if (!ci) Dprintf("read_child(%d) - pid is not in the list of children\n", pid);
#endif
      if (!ci) return R_NilValue; /* if the child doesn't exist anymore, returns NULL */
      return read_child_ci(ci);     
}

SEXP read_children(SEXP sTimeout) {
      int maxfd = 0, sr, wstat;
      child_info_t *ci = children;
      fd_set fs;
      struct timeval tv = { 0, 0 }, *tvp = &tv;
      if (isReal(sTimeout) && LENGTH(sTimeout) == 1) {
            double tov = asReal(sTimeout);
            if (tov < 0.0) tvp = 0; /* Note: I'm not sure we really should allow this .. */
            else {
                  tv.tv_sec = (int) tov;
                  tv.tv_usec = (int) ((tov - ((double) tv.tv_sec)) * 1000000.0);
            }
      }
      while (waitpid(-1, &wstat, WNOHANG) > 0) {}; /* check for zombies */
      FD_ZERO(&fs);
      while (ci && ci->pid) {
            if (ci->pfd > maxfd) maxfd = ci->pfd;
            if (ci->pfd > 0) FD_SET(ci->pfd, &fs);
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("read_children: maxfd=%d, timeout=%d:%d\n", maxfd, (int)tv.tv_sec, (int)tv.tv_usec);
#endif
      if (maxfd == 0) return R_NilValue; /* NULL signifies no children to tend to */
      sr = select(maxfd+1, &fs, 0, 0, tvp);
#ifdef MC_DEBUG
      Dprintf("sr = %d\n", sr);
#endif
      if (sr < 0) {
            perror("select");
            return ScalarLogical(0); /* FALSE on select error */
      }
      if (sr < 1) return ScalarLogical(1); /* TRUE on timeout */
      ci = children;
      while (ci && ci->pid) {
            if (ci->pfd > 0 && FD_ISSET(ci->pfd, &fs)) break;
            ci = ci -> next;
      }
#ifdef MC_DEBUG
      Dprintf("set ci=%p (%d, %d)\n", (void*) ci, ci?ci->pid:0, ci?ci->pfd:0);
#endif
      /* this should never occur really - select signalled a read handle
         but none of the handles is set - let's treat it as a timeout */
      if (!ci) return ScalarLogical(1);
      else
            return read_child_ci(ci);
      /* we should never land here */
      return R_NilValue;
}

SEXP rm_child(SEXP sPid) {
      int pid = asInteger(sPid);
      return ScalarLogical(rm_child_(pid));
}

SEXP mc_children() {
      unsigned int count = 0;
      SEXP res;
      int *pids;
      child_info_t *ci = children;
      while (ci && ci->pid > 0) {
            count++;
            ci = ci->next;
      }
      res = allocVector(INTSXP, count);
      if (count) {
            pids = INTEGER(res);
            ci = children;
            while (ci && ci->pid > 0) {
                  (pids++)[0] = ci->pid;
                  ci = ci->next;
            }
      }
      return res;
}

SEXP mc_fds(SEXP sFdi) {
      int fdi = asInteger(sFdi);
      unsigned int count = 0;
      SEXP res;
      child_info_t *ci = children;
      while (ci && ci->pid > 0) {
            count++;
            ci = ci->next;
      }
      res = allocVector(INTSXP, count);
      if (count) {
            int *fds = INTEGER(res);
            ci = children;
            while (ci && ci->pid > 0) {
                  (fds++)[0] = (fdi == 0) ? ci->pfd : ci->sifd;
                  ci = ci->next;
            }
      }
      return res;
}


SEXP mc_master_fd() {
      return ScalarInteger(master_fd);
}

SEXP mc_is_child() {
      return ScalarLogical(is_master?0:1);
}

SEXP mc_kill(SEXP sPid, SEXP sSig) {
      int pid = asInteger(sPid);
      int sig = asInteger(sSig);
      if (kill((pid_t) pid, sig))
            error("Kill failed.");
      return ScalarLogical(1);
}

SEXP mc_exit(SEXP sRes) {
      int res = asInteger(sRes);
#ifdef MC_DEBUG
      Dprintf("child %d: exit called\n", getpid());
#endif
      if (is_master) error("exit can only be used in a child process");
      if (master_fd != -1) { /* send 0 to signify that we're leaving */
            unsigned int len = 0;
            write(master_fd, &len, sizeof(len));
            /* make sure the pipe is closed before we enter any waiting */
            close(master_fd);
            master_fd = -1;
      }
      if (!child_can_exit) {
#ifdef MC_DEBUG
            Dprintf("child %d is waiting for permission to exit\n", getpid());
#endif
            while (!child_can_exit) {
                  sleep(1);
            }
      }
            
#ifdef MC_DEBUG
      Dprintf("child %d: exiting\n", getpid());
#endif
      exit(res);
      error("exit failed");
      return R_NilValue;
}

Generated by  Doxygen 1.6.0   Back to index