Parallel Processing of Directories/Files

I've been working with a rules engine product recently that stores all of it's instructions in XML files. To support their rating algorithms, my company now has over 11,000 XML files stored in over 800 directories. These files can relate to one another through a sort of cascading inheritance. Considering the amount of data, I was looking for a way to visualize, but before I could even begin putting the data into a graph, I had to parse through all of the files to build the relationships.

My first attempt used Groovy's eachFileRecursive method to spin through all files and parse them, but it took about 12 minutes to run, which was far too long. So, I tried to run the parsing in parallel, which improved the time considerably, but I was still running for several minutes.

ThreadPoolExecutor executor = Executors.newFixedThreadPool(20);
directory.eachFileRecurse (FileType.FILES) { File file ->
  executor.execute(buildParser(manuscripts, file))
}
private static Callable buildParser(List manuscripts, File file) {
  return { //do stuff } as Callable
}
executor.shutdown()
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)

My next step was to change how I was parsing the XML files. Originally I was using the XmlParser.

String line
file.withReader {
  while ((line = it.readLine()) != null) {
    if (line =~ /^\s*<properties/) {
      line = line.replaceFirst('([^/])>\\s*$') { it[0][0] + '/>' }
      
      def properties = new XmlParser().parseText(line)
      def props = [manuscriptID: properties.@manuscriptID?.toString(),
             inherited: properties.@inherited?.toString(),
             caption: properties.@caption?.toString()]
      if (props.inherited) {
        manuscripts << props
      }
      break
    }
  }
}

Sometimes the properties tag was self closing, other times it had children, so I had to do some manipulation of the String before I could apply the XmlParser to it. It didn't seem that bad, but it definitely wasn't fast, so I decided to try a SAX parser and it made a drastic improvement, I was no down to 20-30 seconds to get through all 11,000+ files.

class SaxManuscriptPropertiesHandler extends DefaultHandler {
  Map manuscriptProperties
  
  void startElement(String ns, String localName, String qName, Attributes atts) {
    switch (qName) {
      case 'properties':
        manuscriptProperties = [id: atts.getValue('manuscriptID'),
                                desc: atts.getValue('caption')]
        if (atts.getValue('inherited')) {
          manuscriptProperties['parent'] = atts.getValue('inherited')
        }
        //purposeful fall through to end processing
      case 'model':
        throw new StopProcessingException()
    }
  }
  private static Map parseManuscript(File file) {
    SaxManuscriptPropertiesHandler handler = new SaxManuscriptPropertiesHandler()
    def reader = SAXParserFactory.newInstance().newSAXParser().XMLReader
    reader.setContentHandler(handler)

    try {
      reader.parse(new InputSource(file.newInputStream()))
    } catch (StopProcessingException e) {
      //finished reading XML document
    }
    return handler.manuscriptProperties
  }
}

Getting there, but I still wanted to see if it could better, so I set some timers in my code to see where the remaining time was being spent and discovered that reading through the directories to load up the files was still taking a lot of time. The eachFileRecursive method was still single threaded. So I decided to parallelize that as well. In the new approach, each directory is added to the thread pool and processed separately.

The only problem I ran into was that I couldn't use shutdown - awaitTermination approach anymore because the main thread would reach the shutdown call before all of the directories/files had been added to the executor pool, so I switch the wait condition to use sleep and wait until all of the threads were finished processing.

class ManuscriptDirectoryParser {
  def manuscripts = [] as CopyOnWriteArrayList
  ThreadPoolExecutor executor = Executors.newFixedThreadPool(20);
  
  public parseDirectory(String directory) {
    processDirectory(new File(directory))
    while (!executor.shutdown) {
      Thread.sleep(1000)
      if (executor.activeCount == 0) {
        executor.shutdown()
      }
    }
    //write results to file
  }
  private processDirectory(File file) {
    file.listFiles().each { File child ->
      if (child.directory) {
        executor.execute({processDirectory(child)} as Runnable)
      } else {
        executor.execute({
          manuscripts << SaxManuscriptPropertiesHandler.parseManuscript(child) 
        } as Runnable)
      }
    }
  }
}

The culmination to all the improvements meant that I could run the parsing job in 14 seconds now, considerably better than the 12+ minutes where I started.

Post a Comment