On Creating a MapDB Schema Index Provider for Neo4j 2.0
Writing an Neo4j 2.0 Schema Index Provider for MapDB
Neo4j 2.0 introduced the concept of real automatic indexes with a new underlying indexing subsystem SPI. So I thought it would be really helpful to try it out and provide a faster indexing implementation than the default lucene one. I chose MapDB for it and the results are here on github.
Using the index is quite easy from Cypher and the other APIs:
// Cypher CREATE INDEX ON :Label(property) e.g. CREATE INDEX ON :Person(name) // the index is used automatically, but can be enforced with MATCH (n:Person) USING INDEX :Person(name) where n.name = "Andres"
// Java
Label LABEL = DynamicLabel.label("foo");
String PROPERTY = "bar";
// Creation
Transaction tx = db.beginTx();
IndexCreator indexCreator = db.schema().indexCreator(LABEL).on(PROPERTY);
IndexDefinition indexDefinition = indexCreator.create();
tx.success(); tx.finish();
db.schema().awaitIndexOnline(indexDefinition, 5, TimeUnit.SECONDS);
// Usage, get Index Information
IndexDefinition index = IteratorUtil.single(db.schema().getIndexes(LABEL));
assertEquals(LABEL.name(), index.getLabel().name());
// Create matching Node
Transaction tx = db.beginTx();
Node node = db.createNode(LABEL);
node.setProperty(PROPERTY, 42);
tx.success(); tx.finish();
// Find nodes
ResourceIterable<Node> nodes = db.findNodesByLabelAndProperty(LABEL, PROPERTY, 42);
MapDB is a very potent implementation of an effective in-memory and persistent map structure, either as b-tree or hashmap. It supports optimized serialization of arbitrary Java objects including collections, compresses data on the fly even with id-compression and much more. A very important feature for adding MapDB as an index provider is the support for snapshots.
Support for transaction like semantics allow for batch-updates which is also really cool for the index provider which uses batch-updates too.
A code-example from the MapDB website:
import org.mapdb.*;
//Configure and open database using builder pattern.
DB db = DBMaker.newFileDB(new File("testdb")).closeOnJvmShutdown().make();
//create new collection (or open existing)
ConcurrentNavigableMap map = db.getTreeMap("collectionName");
map.put(1,"one");
map.put(2,"two");
//persist changes into disk, there is also rollback() method
db.commit();
db.close();
So choosing MapDB as an index provider was really straightforwad. Now the small task is only to implement the SPI.
The requirements for implementing the SPI are quite simple. We have to tie into Neo4j’s lifecycle management with an IndexProviderFactory to
register the index provider and which implements the SchemaIndexProvider which supplies an IndexPopulator and IndexAccessor that handle index updates and an IndexReader that
has to provide a repeatable read snapshot view of the data in the index. Actually I just copied the code from the org.neo4j.kernel.impl.api.index.InMemoryIndexProvider and adapted it for MapDb.
The MapDbIndexProviderFactory is tiny, it just returns the single instance of MapDbSchemaIndexProvider in the newKernelExtension lifecycle method.
@Service.Implementation(KernelExtensionFactory.class)
public class MapDbIndexProviderFactory extends KernelExtensionFactory<MapDbIndexProviderFactory.Dependencies> {
public interface Dependencies {}
private final MapDbSchemaIndexProvider singleProvider;
public MapDbIndexProviderFactory() {
// name and version
super(new SchemaIndexProvider.Descriptor("mapdb-index", "1.0"));
this.singleProvider = new MapDbSchemaIndexProvider();
}
@Override
public Lifecycle newKernelExtension(Dependencies dependencies) throws Throwable {
return singleProvider;
}
}
To register the MapDbIndexProviderFactory we have to provide a file named org.neo4j.kernel.extension.KernelExtensionFactory in META-INF/services that contains the fully qualified name of our Factory, in its role as KernelExtensionFactory, which is: org.neo4j.index.mapdb.MapDbIndexProviderFactory.
The MapDbSchemaIndexProvider extends SchemaIndexProvider, it is also an instance of Lifecycle, so it implements init(), start(), stop(), shutdown(). In the constructor it registers itself with a descriptor and priority (2 is higher than the default 1 for lucene) and creates a MapDB Database instance which is used later on.
public MapDbSchemaIndexProvider() {
super(new SchemaIndexProvider.Descriptor("mapdb-index", "1.0"), 2);
db = DBMaker.newFileDB(new File("mapdb-index"))
.compressionEnable().closeOnJvmShutdown().make();
}
It keeps an internal CopyOnWriteHashMap for a list of index-instances by name which are representated by the appropriate MapDB tree-map. The 3 methods from SchemaIndexProvider provide access to each concrete index (indexId is unique per declared :Label(property) combination).
@Override
public MapDbIndex getOnlineAccessor(long indexId) {
MapDbIndex index = indexes.get(indexId);
if (index == null || index.state != InternalIndexState.ONLINE)
throw new IllegalStateException("Index " + indexId + " not online yet");
return index;
}
@Override
public InternalIndexState getInitialState(long indexId) {
MapDbIndex index = indexes.get(indexId);
return index != null ? index.state : InternalIndexState.POPULATING;
}
@Override
public MapDbIndex getPopulator(long indexId) {
BTreeMap<Object,Set<Long>> map = db.getTreeMap(String.valueOf(indexId));
MapDbIndex index = new MapDbIndex(map,db);
indexes.put(indexId, index);
return index;
}
The getPopulator returns the IndexPopulator which is repsonibile for updating the index. That happens within a separate class called MapDbIndex which handles addition of removal of batches of value->nodeId pairs to the MapdDB tree-map instance, all of this happens in the implementation of updateAndCommit and recover which both call update(Iterable updates). That method then decides on the Mode of NodePropertyUpdate to either add,remove or update data. In this demo I base the implementation on storing Sets of Long values for the node-id’s. The real implementation is a bit more evolved to save space and skip (un-)boxing.
private void add(Object value, Long id) {
Set<Long> ids=indexData.get(value);
if (ids==null) ids = new HashSet<Long>();
ids.add(id);
indexData.put(value,ids);
}
private void remove(Object value, Long id) {
Set<Long> ids=indexData.get(value);
if (ids==null) return;
ids.remove(id);
indexData.put(value,ids);
}
public void update(Iterable<NodePropertyUpdate> updates) {
for (NodePropertyUpdate update : updates) {
switch (update.getUpdateMode()) {
case ADDED:
add(update.getValueAfter(),update.getNodeId());
break;
case CHANGED:
remove(update.getValueBefore(), update.getNodeId());
add(update.getValueAfter(),update.getNodeId());
break;
case REMOVED:
remove(update.getValueBefore(), update.getNodeId());
break;
default:
throw new UnsupportedOperationException();
}
}
db.commit();
}
The IndexReader must make sure to supply a reapeatable read view of the data, using MapDB’s treeMap.snapshot() facility. So implementing the MapDbIndexReader is not complicated.
@Override
public IndexReader MapDbSchemaIndexProvider.newReader() {
return new MapDbMemoryReader((BTreeMap<Object, Set<Long>>) indexData.snapshot());
}
private static class MapDbIndexReader implements IndexReader {
private BTreeMap<Object, Set<Long>> snapshot;
@Override
public Iterator<Long> lookup(Object value) {
final Set<Long> result = snapshot.get(value);
return result == null ? IteratorUtil.<Long>emptyIterator() : result.iterator();
}
}
That’s about it.
You just have to clone the repository, build it with mvn package and put the jar file target/mapdb-index-1.0.jar as well as org.mapdb:mapdb:jar:0.9.1 in your classpath or server/plugins directory to use the index.
. For both you can also just use the contents of the generated target/target/mapdb-index-1.0-provider.zip.
So far in tests it was twice as fast as Lucene but there is certainly optimization potential.
In general it was really simple to implement the index provider, so I suggest you go ahead and try it for other NOSQL stores. Would really love to see some other implementations out there.




