Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using lists in foreach R

I am trying to parallelize the extraction of data saved in some html documents and to store it into data.frames (some millions of documents, hence the usefulness of the parallelization).

In a first step, on the machine where I do register the queue, I select a subset of the html files and lapply to them the read_html function (from the rvest package, I have also tried the similar function from the XML package but I was getting memory leak problems) to obtain a unique list storing the content of many html pages.

Then I use an iterator on this list to obtain smaller chunks of it to be feeded to the foreach.

Inside the foreach I build up the data.frame(s) (using html_table function and some basic data manipulation) and I return a list whose elements are the cleaned data.frames.

I have tried to use both the doSNOW backend on win 8 and the doRedis one on ubuntu 16.04.

In the first case a list of empty lists is returned while in the second one an error of memory mapping is thrown; you can find the traceback at the very bottom of the question.

To my understanding the (chunks of) lists that I am sending to the cores are not behaving as I expect. I have gathered around that the list object may be just a set of pointers but I have not been able to confirm it; maybe this could be the issue? Is there an alternative to the "list way" to "encapsulate" the data of multiple html pages?

Below you can find some code reproducing the issue. I am a completely new to stack overflow, new to parallel programming and fairly new to R programming: any advice for improvement is welcome. Thank you all in advance.

library(rvest)
library(foreach)

#wikipedia pages of olympic medalist between 1992 and 2016 are
# downloaded for reproducibility
for(i in seq(1992, 2016, by=4)){

  html = paste("https://en.wikipedia.org/wiki/List_of_", i, "_Summer_Olympics_medal_winners", sep="")
  con = url(html)
  htmlCode = readLines(con)
  writeLines(htmlCode, con=paste(i, "medalists", sep="_"))
  close(con)

}

#declaring the redis backend (doSNOW code is also included below)

#note that I am using the package from 
#devtools::install_github("bwlewis/doRedis") due to a "nodelay error"
#(more info on that here: https://github.com/bwlewis/doRedis/issues/24)
# if it is not your case please drop the nodelay and timeout options

#Registering cores ---Ubuntu---
cores=2
library('doRedis')
options('redis:num'=TRUE)
registerDoRedis("jobs", nodelay=FALSE)
startLocalWorkers(n=cores, "jobs", timeout=2, nodelay=FALSE)
foreachOpt <- list(preschedule=FALSE)


#Registering cores ---Win---
#cores=2
#library("doSNOW")
#registerDoSNOW(makeCluster(cores, type = "SOCK"))


#defining the iterator
iterator <- function(x, ...) {
  i <- 1
  it <- idiv(length(x), ...)

  if(exists("chunks")){
    nextEl <- function() {
      n <- nextElem(it)
      ix <- seq(i, length=n)
      i <<- i + n
      x[ix]
    }
  }else{
    nextEl <- function() {
      n <- nextElem(it)
      ix <- seq(i, i+n-1)
      i <<- i + n
      x[ix]
    }
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c(
    'ivector', 'abstractiter','iter')
  obj
}

#reading files
names_files<-list.files()
html_list<-lapply(names_files, read_html)

#creating iterator
ChunkSize_html_list<-2
iter<-iterator(html_list, chunkSize=ChunkSize_html_list)

#defining expanding list (thanks StackOverflow and many thanks to
#JanKanis's answer : http://stackoverflow.com/questions/2436688/append-an-object-to-a-list-in-r-in-amortized-constant-time-o1  )
expanding_list <- function(capacity = 10) {
  buffer <- vector('list', capacity)
  length <- 0

  methods <- list()

  methods$double.size <- function() {
    buffer <<- c(buffer, vector('list', capacity))
    capacity <<- capacity * 2
  }

  methods$add <- function(val) {
    if(length == capacity) {
      methods$double.size()
    }

    length <<- length + 1
    buffer[[length]] <<- val
  }

  methods$as.list <- function() {
    b <- buffer[0:length]
    return(b)
  }

  methods
}

#parallelized part
clean_data<-foreach(ite=iter, .packages=c("itertools", "rvest"), .combine=c,
 .options.multicore=foreachOpt, .options.redis=list(chunkSize=1)) %dopar% {

  temp_tot <- expanding_list()
      for(g in 1:length(ite)){

        #extraction of data from tables
      tables <- html_table(ite[[g]], fill=T, header = T)

        for(i in 1:length(tables)){

          #just some basic data manipulation
          temp<-lapply(tables, function(d){d[nrow(d),]})
          temp_tot$add(temp)
          rm(temp)
          gc(verbose = F)
        }
      }
  #returning the list of cleaned data.frames to the foreach 
    temp_tot$as.list()
}

Error thrown when using the redis backend:

*** caught segfault ***
address 0x60, cause 'memory not mapped'


Traceback:
 1: .Call("xml2_doc_namespaces", PACKAGE = "xml2", doc)
 2: doc_namespaces(doc)
 3: xml_ns.xml_document(x)
 4: xml_ns(x)
 5: xpath_search(x$node, x$doc, xpath = xpath, nsMap = ns, num_results = Inf)
 6: xml_find_all.xml_node(x, ".//table")
 7: xml2::xml_find_all(x, ".//table")
 8: html_table.xml_document(ite[[g]], fill = T, header = T)
 9: html_table(ite[[g]], fill = T, header = T)
10: eval(expr, envir, enclos)
11: eval(.doRedisGlobals$expr, envir = .doRedisGlobals$exportenv)
12: doTryCatch(return(expr), name, parentenv, handler)
13: tryCatchOne(expr, names, parentenv, handlers[[1L]])
14: tryCatchList(expr, classes, parentenv, handlers)
15: tryCatch({    lapply(names(args), function(n) assign(n, args[[n]], pos = .doRedisGlobals$exportenv))    if (exists(".Random.seed", envir = .doRedisGlobals$exportenv)) {        assign(".Random.seed", .doRedisGlobals$exportenv$.Random.seed,             envir = globalenv())    }    tryCatch({        if (exists("set.seed.worker", envir = .doRedisGlobals$exportenv))             do.call("set.seed.worker", list(0), envir = .doRedisGlobals$exportenv)    }, error = function(e) cat(as.character(e), "\n"))    eval(.doRedisGlobals$expr, envir = .doRedisGlobals$exportenv)}, error = function(e) e)
16: FUN(X[[i]], ...)
17: lapply(work[[1]]$argsList, .evalWrapper)
18: redisWorker(queue = "jobs", host = "localhost", port = 6379,     iter = Inf, linger = 30, log = stdout(), timeout = 2, nodelay = FALSE)
aborting ...
like image 326
dgdi Avatar asked Sep 01 '16 08:09

dgdi


1 Answers

I think the problem is that you're creating the XML/HTML document objects on the master by calling "read_html", and then processing them on the workers. I've tried some experiments, and it looks like that doesn't work, probably because those objects can't be serialized, sent to the workers, and then deserialized correctly. I think the objects are corrupt, causing the workers to segfault when they try to operate on them using the "html_table" function.

I suggest that you modify your code to iterate over the file names so that the workers can call "read_html" themselves, thus avoiding serializing the XML document objects.


Here is some of the test code that I experimented with:

library(xml2)
library(snow)
cl <- makeSOCKcluster(3)
clusterEvalQ(cl, library(xml2))

# Create XML documents on the master
docs <- lapply(1:10,
      function(i) read_xml(paste0("<foo>", i, "</foo>")))

# Call xml_path on XML documents created on master
r1 <- lapply(docs, xml_path)            # correct results
r2 <- clusterApply(cl, docs, xml_path)  # incorrect results

# This seems to work...
docs2 <- clusterApply(cl, 1:10,
      function(i) read_xml(paste0("<foo>", i, "</foo>")))

# But this causes a segfault on the master
print(docs2)

I used snow functions directly to verify that the problem wasn't in foreach or doSNOW.

like image 98
Steve Weston Avatar answered Oct 16 '22 01:10

Steve Weston