Parallel Batch Inserter with Neo4j imported 20 billion relationships on EC2

Posted by Michael Hunger on Oct 27, 2012 in code, java, neo4j |

As massive data insertion performance has bothered me for a while, I made it the subject of my last lab days (20% time) at Neo4j. The results of my work are available on GitHub and I explain the approach below.

Data Insertion issues

When getting started with a new database like the graph database Neo4j it is important to quickly get an initial data-set to work with. Either you write a data-generator to generate it or you have existing data in relational- or NOSQL-databases that you want to import.

In both cases the import is unusual as oftentimes hundreds of millions or billions of nodes and relationships have to be imported in a short time. The normal write load of a graph database doesn’t cater for those insertion speeds. That’s why Neo4j has a BatchInserter that is able to import data quickly by loosening the transactional constraints but in doing so only working in a single thread.

If for instance only nodes w/o properties are imported, the inserter reaches a speed of 1 million nodes per second which is nice. But as soon as relationships and properties come into the picture the insertion speed drops noticeably. The reason for that degradation is that the single-threaded approach doesn’t utilize all available resources in a modern system. Neither the plethora of CPU’s nor the high concurrent throughput (up to 200MB/s) of modern (SSD) even in multiple streams is used.

Approach to Parallelization

So the idea was to identify independent parts of the insertion process which can be run in parallel and possibly also parallelized in themselves. I used the LMAX Disruptor as a simple and very scalable framework to parallelize the tasks. It uses a large ring-buffer filled with preallocated, struct-like objects and a lock free implementation for synchronizing producer and consumer tasks. Disruptor achieves high throughput and low latency operations on modern hardware.

The 7 distinct operations of the batch-insertion process are:

  1. node-id generation
  2. property encoding
  3. property-record creation
  4. relationship-id creation and forward handling of reverse relationship chains
  5. writing node-records
  6. writing relationship-records
  7. writing property-records

The node-id generation doesn’t have to performed as a separate task b/c disruptor already has a sequence id that increases linearly. Except for the property-encoding all other operations are currently executed in only one instance due to single shared state – either the generated id’s or the writing to the store-files. It is possible to stripe them but right now that was out of scope.

Their dependencies look as follows:

Task dependencies

I created a set of handlers for the individual tasks and some smaller abstractions for allowing individual strategies for certain operations. The main class is DisruptorBatchInserter which takes the initial setup and config as well as a NodeStructFactory which creates both the initial struct objects and also fills in data per node.

inserter = (BatchInserterImpl) BatchInserters.inserter(storeDir, config);
inserter.getNeoStore().getNodeStore().setHighId(nodesToCreate + 1);
nodeStructFactory.init(inserter);
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

incomingEventDisruptor = new Disruptor<NodeStruct>(nodeStructFactory, executor,
	       new SingleThreadedClaimStrategy(RING_SIZE), new YieldingWaitStrategy());

createHandlers(inserter.getNeoStore());

incomingEventDisruptor.
        handleEventsWith(propertyMappingHandlers).
        then(propertyRecordCreatorHandler, relationshipIdHandler).
        then(nodeWriter, relationshipWriter, propertyWriter);
DisruptorBatchInserter inserter = new DisruptorBatchInserter(
   STORE_DIR, config(), NODES_TO_CREATE, new TestNodeStructFactory());
inserter.init();
try {
    inserter.run();
} finally {
    inserter.shutdown();
}
inserter.report();

Execution and Testing

The current setup uses a ring-buffer of about 2^18 to 2^20 elements and a heap size of 5-20G, and a MMIO configuration within the heap limits. It can be executed using maven:

MAVEN_OPTS="-Xmx5G -Xms5G -server -d64 -XX:NewRatio=5"  \
mvn exec:java -Dexec.mainClass=org.neo4j.batchimport.DisruptorTest -Dexec.classpathScope=test

On my iMac with 16GB RAM and 8 (virtual) CPUs I could insert 40M nodes with 2 properties, 400M relationships with 1 property each in 900 seconds which had a sustained disk write load of up to 80MB/s and ended up with a store of 29GB.

To test it on a better scalable system I used a Amazon EC2 hi1.4xlarge (60GB RAM, 35 compute units, 2×1TB SSD).
There I inserted the previous data volume in roughly half the time. With a larger set of 100M nodes and one billion relationships and their properties it took 23 minutes to create a store of 75GB size write rates up to 120MB/s.

A bigger test of 2 billion nodes (2 properties) and 20 billion relationships (1 property) resulted in a store of 1.4 TB within 11 hours, keeping a sustained write throughput of 500k elements per second.

For checking the consistency of the generated store I use the new ConsistencyCheckTool which comes with the Neo4j enterprise edition. It helped me to root out some errors, especially around the bi-directional relationship-chains.

Current Limitations, Future Improvements

The current implementation has some constraints by design and one by implementation:

  • have to know max # of relationships per node, and max # of properties per node and relationship
  • relationships have to be pre-sorted by min(start,end)

Future improvements include a further striping of tasks currently executed in a single fashion, also pulling out the writing of dynamic record stores (strings and arrays). Taking multiple disks into account when writing in parallel might also give some performance improvements. An important task will be implementing the current CSV importer on top of this.

Share and Enjoy:
  • Print
  • Digg
  • del.icio.us
  • Facebook
  • LinkedIn
  • Netvibes
  • PDF
  • Ping.fm

3 Comments

  • Uldis says:

    Both this blog post and notes to the batch-importer say “relationships have to be pre-sorted by min(start,end)”.

    Could you elaborate on how exactly do they need to be sorted (and what happens if they are not sorted as required)?

    I am asking because rels.csv example in https://github.com/jexp/batch-import#readme is not sorted by min(start, end) — the last line is out of place.

  • This is only necessary for the ParallelBatch-inserter and it means that the relationship-file should be sorted by the minimum node-id of the start and end-node, so if it was

    1 -> 5
    3 -> 2
    6 -> 1

    the rel-file should be:

    1 -> 5
    6 -> 1
    3 -> 2

    It also helps for the normal inserter (speeds it up b/c adding rels doesn’t have to walk across the whole relationship-store-file.

  • Alessandro says:

    Hello Micheal,

    before I begin to use the parallel batch inserter I’d like your experienced opinion.
    Im trying to insert 1 million nodes to the neo4j db.
    The nodes represent a phone call network and the important constraint that I have is that they have to follow a power law and that is why I’m using the <a href="http://igraph.sourceforge.net/doc/python/igraph.GraphBase-class.html#Static_Power_Law" Igraph power law function. The function generates a power law graph which I explore with a script which, for every node, generates a single cypher string for creating node and it does the same thing for every relationship. Here is the code.
    from igraph import *
    from collections import deque
    from random import randint

    DEGREE = 3.16
    EXPONENT = 2.1

    def make_graph(n, d = DEGREE):
    edges = int((d /2.0) * n)
    g=Graph.Static_Power_Law(n, edges, EXPONENT)
    #print summary(g)
    used = set()
    for i in range(n):
    # for x in range(g.degree(i)):
    vicini=g.neighbors(i, mode=OUT)
    # print i,vicini
    q= len(vicini)
    #print "controllo che",i, "non sia in", used
    for c in range(q):
    if(i in used):
    if (vicini

    [/c] in used):
    				#tutte due usati
    					print "MATCH (n:rsa{name:\'nodo"+repr(i)+"\'}),(m:rsa{name:\'nodo"+repr(vicini[c]

    )+"\'}) MERGE (n:rsa{name:\'nodo"+repr(i)+"\'})-[:CALLS]->(m:rsa{name:\'nodo"+repr(vicini

    [/c])+"\'});"
    				else:
    				#primo si altro no
    					print "MATCH (n:rsa{name:\'nodo"+repr(i)+"\'}) MERGE (n:rsa{name:\'nodo"+repr(i)+"\'})-[:CALLS]->(:rsa{name:\'nodo"+repr(vicini[c]

    )+"\'});"
    used.add(vicini

    [/c])
    			else:
    				if (vicini[c]

    in used):
    print "MATCH (n:rsa{name:\'nodo"+repr(vicini

    [/c])+"\'}) MERGE (:rsa{name:\'nodo"+repr(i)+"\'})(:rsa{name:\'nodo"+repr(vicini[c]

    )+"\'});"
    used.add(i)
    used.add(vicini[/c])

    make_graph(1000000,2.1)

    The output will have ~2,5M cypher strings, separated with a “;” so that I can throw them in the neo4j-shell with an -file output.txt and wait.
    The problem is that I think it works but it is taking like 2 days on my 4GB Macbook air to finish generating the dataset.
    I don’t know the # of relationships for every node since I can’t control the igraph function.
    Could you please help me find a solution?

    Thank you so much.

Leave a Reply

XHTML: You can use these tags:' <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Copyright © 2007-2014 Better Software Development All rights reserved.
Multi v1.4.5 a child of the Desk Mess Mirrored v1.4.6 theme from BuyNowShop.com.