Skip to content

Commit

Permalink
Super Node: Add proxy node type
Browse files Browse the repository at this point in the history
This aims to fix the super node problem by introducing proxy
nodes (which works like the partition node before), except that
we introduce proxy nodes in application layer only when needed.
This commit focuses on how to make users a seamless use experience
as if the proxy node does not exist.

What is not clear right now is, if A is a super node, and A connects
to a number of vertices with different labels (and possibly different
properties like timestamp). Let's say we have a proxy node for A, Vpa,
then how many edges should we create between Va and Vpa?

1) Create 0 edge between Va and proxies. We store id(Vpa) as a vertex property
in Va. Every time we need to traverse from Va, we always fetch Vpa, and
do the traversal from there. Let's say Va -.-.-.-> Vpa -------> Vb, then
when we traverse from Vb, we will find Vpa first, and then we retrieve Va
because Vpa is just a proxy for Va. This is very similar if not exactly the
same as vertex-cut partition in JanusGraph.

2) Create 1 edge between Va and each Vpa. No specific benefit; drawback is, this
edge will be queried, which means we need to remove this edge.

3) Create 1 edge per label between Va and and each Vpa. Benefit is that for queries
with label constraint that does not apply here, we don't even need to traverse
Vpa because it won't be found with this label. Drawback is also very obvious,
if Vpa ---label1---> Vb1, and Vpa ---label2---> Vb2, then if we traverse from
Vb1 to Va, we will see two edges, unless we somehow see that we use label1 to
go from Vb1 to Vpa thus we shall only use label1 to go from Vpa to Va. This is
very cumbersome so we shall not use it. This commit actually uses 3) which makes
it very difficult to pass all tests.

4) Create one proxy node for a particular edge type (label + props). Suppose edges are all very
similar except for a few properties. Then for each unique edge type, we create a
proxy node. Benefit is we can fully utilize VCI because these edges actually
represent all physical edges. Drawback is this is very application-specific.
For example, if we only need rundate in traversal, then one or more proxy
nodes will represent a particular rundate. If original query is g.V(Va).outE().has("rundate", "20210512").inV(), then we can utilize VCI for rundate on Va. Note that there is
only 1 edge between Va and each proxy node.

Another big challenge of 2, 3, and 4 is how to control edge traversal. If Vpa connects
to Vb1 and Vb2, we must avoid wrong traversal from Vb1 to Vb2 via Vpa (when the user
aims to traverse from Vb1 to Va), which is difficult in practice.

This commit uses option (1): Create proxy nodes but don't draw connection between canonical node and proxy node

Signed-off-by: Boxuan Li <[email protected]>
  • Loading branch information
li-boxuan committed May 14, 2024
1 parent 4bddfb4 commit aa9791f
Show file tree
Hide file tree
Showing 13 changed files with 328 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -8846,6 +8848,86 @@ public void performReindexAndVerifyEdgeCount(String indexName, String edgeLabel,
}
}

@Test
public void testNeighborCountWithProxyNode() {
mgmt.makePropertyKey("proxies").dataType(Long.class).cardinality(Cardinality.LIST).make();
mgmt.makeVertexLabel("proxy").setProxy().make();
mgmt.commit();

newTx();

Vertex v0 = graph.addVertex("vertexId", "v0");
Vertex v1 = graph.addVertex("vertexId", "v1");
Vertex v2 = graph.addVertex("vertexId", "v2");
Vertex v3a = graph.addVertex("vertexId", "v3a");
Vertex v3b = graph.addVertex("vertexId", "v3b");
Vertex v4 = graph.addVertex("vertexId", "v4");

v0.addEdge("normal-edge", v1);
v2.addEdge("normal-edge", v3a);
v1.addEdge("labelX", v4);

// assume now v1 becomes a super node and we decide to introduce proxy node(s). The previous edges are retained.
// assume the application logic adds an edge from v1 to v2 with labelX, another edge from v1 to v3a with labelY,
// and another edge from v1 to v3b with labelY.
// what happens under the hood is v1 connects to v2/v3a/v3b via vProxy.
Vertex vProxy = graph.addVertex(T.label, "proxy", "canonicalId", v1.id());
vProxy.addEdge("labelX", v2, "runDate", "01");
vProxy.addEdge("labelY", v3a, "runDate", "02");
vProxy.addEdge("labelY", v3b, "runDate", "02");
// be careful: we need to make sure we assign id right after vertex is created
v1.property("proxies", vProxy.id());
graph.tx().commit();

// ensure proxy node is not involved when we retrieve canonical node's properties
assertEquals(ImmutableList.of("v1"), graph.traversal().V(v1).values("vertexId").toList());

// queries without label constraints
assertEquals(4, graph.traversal().V(v1).out().count().next());
assertEquals(4, graph.traversal().V(v1).out().toList().size());
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).out().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).outE().inV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v0", "v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v1).both().values("vertexId").toList()));
assertTrue(graph.traversal().V(v1).outE().where(__.otherV().has("vertexId", "v2")).hasNext());

assertEquals(2, graph.traversal().V(v3a).in().count().next());
assertEquals(2, graph.traversal().V(v3a).both().count().next());
assertEquals(0, graph.traversal().V(v3a).out().count().next());
assertEquals(2, graph.traversal().V(v3a).inE().count().next());
assertEquals(2, graph.traversal().V(v3a).bothE().count().next());
assertEquals(0, graph.traversal().V(v3a).outE().count().next());
assertEquals(ImmutableSet.of("v1", "v2"), new HashSet<>(graph.traversal().V(v3a).in().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1", "v2"), new HashSet<>(graph.traversal().V(v3a).both().values("vertexId").toList()));

assertEquals(ImmutableSet.of("v1", "v3a"), new HashSet<>(graph.traversal().V(v2).both().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v2).in().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).out().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).outE().inV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a"), new HashSet<>(graph.traversal().V(v2).outE().otherV().values("vertexId").toList()));

// queries with label constraints
assertEquals(ImmutableSet.of("v2", "v4"), new HashSet<>(graph.traversal().V(v1).out("labelX").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a", "v3b"), new HashSet<>(graph.traversal().V(v1).out("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2", "v4"), new HashSet<>(graph.traversal().V(v1).both("labelX").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v3a", "v3b"), new HashSet<>(graph.traversal().V(v1).both("labelY").values("vertexId").toList()));
assertEquals(2, graph.traversal().V(v1).out("labelX").count().next());
assertEquals(2, graph.traversal().V(v1).out("labelY").count().next());

assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).both("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2"), new HashSet<>(graph.traversal().V(v3a).both("normal-edge").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).in("labelY").values("vertexId").toList()));
assertEquals(ImmutableSet.of("v1"), new HashSet<>(graph.traversal().V(v3a).inE("labelY").outV().values("vertexId").toList()));
assertEquals(ImmutableSet.of("v2"), new HashSet<>(graph.traversal().V(v3a).in("normal-edge").values("vertexId").toList()));
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().count().next());
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().toList().size());
assertEquals(4, graph.traversal().V(v3a).in("labelY").out().values("vertexId").toList().size());
assertEquals(ImmutableSet.of("v2", "v3a", "v3b", "v4"), new HashSet<>(graph.traversal().V(v3a).in("labelY").out().values("vertexId").toList()));

assertEquals("02", graph.traversal().V(v3a).inE("labelY").where(__.otherV().has("vertexId", "v1")).next().property("runDate").value());


}

@Test
public void testMultipleOrClauses() {
clopen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface VertexLabel extends JanusGraphVertex, JanusGraphSchemaType {
*/
boolean isStatic();

boolean isProxy();

//TTL

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface VertexLabelMaker {
*/
VertexLabelMaker setStatic();

VertexLabelMaker setProxy();

/**
* Creates a {@link VertexLabel} according to the specifications of this builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMixedIndexAggStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMixedIndexCountStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphMultiQueryStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphProxyTraversalStrategy;
import org.janusgraph.graphdb.tinkerpop.optimize.strategy.JanusGraphStepStrategy;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.transaction.StandardTransactionBuilder;
Expand Down Expand Up @@ -160,7 +161,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
JanusGraphMixedIndexAggStrategy.instance(),
JanusGraphMixedIndexCountStrategy.instance(),
JanusGraphStepStrategy.instance(),
JanusGraphIoRegistrationStrategy.instance());
JanusGraphIoRegistrationStrategy.instance(),
JanusGraphProxyTraversalStrategy.instance());

//Register with cache
TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraph.class, graphStrategies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ private static IDManager.VertexIDType getVertexIDType(VertexLabel vertexLabel) {
return IDManager.VertexIDType.PartitionedVertex;
} else if (vertexLabel.isStatic()) {
return IDManager.VertexIDType.UnmodifiableVertex;
} else if (vertexLabel.isProxy()) {
return IDManager.VertexIDType.ProxyVertex;
} else {
return IDManager.VertexIDType.NormalVertex;
}
Expand All @@ -403,6 +405,7 @@ private class SimpleVertexIDBlockSizer implements IDBlockSizer {
public long getBlockSize(int idNamespace) {
switch (PoolType.getPoolType(idNamespace)) {
case NORMAL_VERTEX:
case PROXY_VERTEX:
return baseBlockSize;
case UNMODIFIABLE_VERTEX:
return Math.max(10,baseBlockSize/10);
Expand All @@ -426,7 +429,7 @@ public long getIdUpperBound(int idNamespace) {

private enum PoolType {

NORMAL_VERTEX, UNMODIFIABLE_VERTEX, PARTITIONED_VERTEX, RELATION, SCHEMA;
NORMAL_VERTEX, PROXY_VERTEX, UNMODIFIABLE_VERTEX, PARTITIONED_VERTEX, RELATION, SCHEMA;

public int getIDNamespace() {
return ordinal();
Expand All @@ -437,6 +440,7 @@ public long getCountBound(IDManager idManager) {
case NORMAL_VERTEX:
case UNMODIFIABLE_VERTEX:
case PARTITIONED_VERTEX:
case PROXY_VERTEX:
return idManager.getVertexCountBound();
case RELATION: return idManager.getRelationCountBound();
case SCHEMA: return IDManager.getSchemaCountBound();
Expand All @@ -449,6 +453,7 @@ public boolean hasOnePerPartition() {
case NORMAL_VERTEX:
case UNMODIFIABLE_VERTEX:
case RELATION:
case PROXY_VERTEX:
return true;
default: return false;
}
Expand All @@ -458,6 +463,7 @@ public static PoolType getPoolTypeFor(IDManager.VertexIDType idType) {
if (idType==IDManager.VertexIDType.NormalVertex) return NORMAL_VERTEX;
else if (idType== IDManager.VertexIDType.UnmodifiableVertex) return UNMODIFIABLE_VERTEX;
else if (idType== IDManager.VertexIDType.PartitionedVertex) return PARTITIONED_VERTEX;
else if (idType== IDManager.VertexIDType.ProxyVertex) return PROXY_VERTEX;
else if (IDManager.VertexIDType.Schema.isSubType(idType)) return SCHEMA;
else throw new IllegalArgumentException("Invalid id type: " + idType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IDManager {
* 000 - * Normal vertices
* 010 - * Partitioned vertices
* 100 - * Unmodifiable (e.g. TTL'ed) vertices
* 110 - + Reserved for additional vertex type
* 110 - * Proxy vertices
* 1 - + Invisible
* 11 - * Invisible (user created/triggered) Vertex [for later]
* 01 - + Schema related vertices
Expand Down Expand Up @@ -125,6 +125,22 @@ final boolean isProper() {
return true;
}
},
ProxyVertex {
@Override
final long offset() {
return 3L;
}

@Override
final long suffix() {
return 6L;
}

@Override
final boolean isProper() {
return true;
}
},
Invisible {
@Override
final long offset() {
Expand Down Expand Up @@ -463,6 +479,7 @@ private static VertexIDType getUserVertexIDType(Object vertexId) {
if (VertexIDType.NormalVertex.is(vertexId)) type=VertexIDType.NormalVertex;
else if (VertexIDType.PartitionedVertex.is(vertexId)) type=VertexIDType.PartitionedVertex;
else if (VertexIDType.UnmodifiableVertex.is(vertexId)) type=VertexIDType.UnmodifiableVertex;
else if (VertexIDType.ProxyVertex.is(vertexId)) type=VertexIDType.ProxyVertex;
if (null == type) {
throw new InvalidIDException("Vertex ID " + vertexId + " has unrecognized type");
}
Expand All @@ -471,7 +488,7 @@ private static VertexIDType getUserVertexIDType(Object vertexId) {

public final boolean isUserVertexId(Object vertexId) {
if (vertexId instanceof Number) {
return (VertexIDType.NormalVertex.is(vertexId) || VertexIDType.PartitionedVertex.is(vertexId) || VertexIDType.UnmodifiableVertex.is(vertexId))
return (VertexIDType.NormalVertex.is(vertexId) || VertexIDType.PartitionedVertex.is(vertexId) || VertexIDType.UnmodifiableVertex.is(vertexId) || VertexIDType.ProxyVertex.is(vertexId))
&& ((((Number) vertexId).longValue() >>> (partitionBits+USERVERTEX_PADDING_BITWIDTH))>0);
} else {
return true;
Expand Down Expand Up @@ -639,6 +656,10 @@ public boolean isPartitionedVertex(Object id) {
return isUserVertexId(id) && VertexIDType.PartitionedVertex.is(id);
}

public boolean isProxyVertex(Object id) {
return isUserVertexId(id) && VertexIDType.ProxyVertex.is(id);
}

public long getRelationCountBound() {
return relationCountBound;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public boolean equals(Object other) {
public String toString() {
return "type["+ relationType.toString()+"]";
}

public RelationType getRelationType() {
return this.relationType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.janusgraph.core.BaseVertexQuery;
import org.janusgraph.core.JanusGraphEdge;
import org.janusgraph.core.JanusGraphRelation;
Expand Down Expand Up @@ -53,8 +54,10 @@
import org.janusgraph.graphdb.relations.RelationComparator;
import org.janusgraph.graphdb.relations.StandardVertexProperty;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.system.BaseLabel;
import org.janusgraph.graphdb.types.system.ImplicitKey;
import org.janusgraph.graphdb.types.system.SystemRelationType;
import org.janusgraph.graphdb.util.CloseableAbstractIterator;
import org.janusgraph.graphdb.vertices.CacheVertex;
import org.janusgraph.util.datastructures.Interval;
import org.janusgraph.util.datastructures.PointInterval;
Expand All @@ -69,6 +72,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -340,7 +344,8 @@ protected Iterable<JanusGraphRelation> executeRelations(InternalVertex vertex, B
return ResultSetIterator.wrap(merge,baseQuery.getLimit());
} else vertex = tx.getCanonicalVertex(vertex);
}
return executeIndividualRelations(vertex,baseQuery);
Iterator iter = new EdgeProxyProcessor(vertex, baseQuery);
return () -> iter;
}

/**
Expand Down Expand Up @@ -374,7 +379,7 @@ protected Iterable<JanusGraphRelation> loadRelationsFromCache(InternalVertex ver
merge = ResultMergeSortIterator.mergeSort(merge, iterable, relationComparator, false);
}
}

return ResultSetIterator.wrap(merge, baseQuery.getLimit());
}
} else {
Expand Down Expand Up @@ -413,7 +418,8 @@ public Iterable<JanusGraphVertex> executeVertices(InternalVertex vertex, BaseVer
return ResultSetIterator.wrap(merge,baseQuery.getLimit());
} else vertex = tx.getCanonicalVertex(vertex);
}
return executeIndividualVertices(vertex,baseQuery);
Iterator iter = new VertexProxyProcessor(vertex, baseQuery);
return () -> iter;
}

private Iterable<JanusGraphVertex> executeIndividualVertices(InternalVertex vertex,
Expand Down Expand Up @@ -859,5 +865,79 @@ private int computeLimit(int remainingConditions, int baseLimit) {
return baseLimit;
}

// TODO: we should not use CloseableAbstractIterator here since it is unmodifiable
public class VertexProxyProcessor extends CloseableAbstractIterator<JanusGraphVertex> {

private List<Long> proxyIds = new ArrayList<>();
private int offset;
private Iterator<JanusGraphVertex> iter;
private BaseVertexCentricQuery baseQuery;

public VertexProxyProcessor(InternalVertex vertex, BaseVertexCentricQuery baseQuery) {
Condition condition = baseQuery.getCondition();
if (condition instanceof And && ((RelationCategory) ((And) condition).get(0)).name().equals(RelationCategory.EDGE.name())
|| condition instanceof RelationTypeCondition && ((RelationTypeCondition) condition).getRelationType().isEdgeLabel()
&& !(((RelationTypeCondition) condition).getRelationType() instanceof BaseLabel)) {
Iterator<VertexProperty<Long>> proxyIter = vertex.properties("proxies");
while (proxyIter.hasNext()) {
proxyIds.add(proxyIter.next().value());
}
}
iter = executeIndividualVertices(vertex, baseQuery).iterator();
this.baseQuery = baseQuery;
}

@Override
protected JanusGraphVertex computeNext() {
if (iter.hasNext()) {
JanusGraphVertex v = iter.next();
if (tx.getIdInspector().isProxyVertex(v.id())) {
long canonicalId = (long) v.property("canonicalId").value();
JanusGraphVertex canonicalV = tx.getVertex(canonicalId);
return canonicalV;
}
return v;
} else if (offset < proxyIds.size()) {
InternalVertex proxyV = (InternalVertex) tx.getVertex(proxyIds.get(offset++));
iter = executeIndividualVertices(proxyV, baseQuery).iterator();
return computeNext();
}
return endOfData();
}
}

public class EdgeProxyProcessor extends CloseableAbstractIterator<JanusGraphRelation> {

private List<Long> proxyIds = new ArrayList<>();
private int offset;
private Iterator<JanusGraphRelation> iter;
private BaseVertexCentricQuery baseQuery;

public EdgeProxyProcessor(InternalVertex vertex, BaseVertexCentricQuery baseQuery) {
Condition condition = baseQuery.getCondition();
if (condition instanceof And && ((RelationCategory) ((And) condition).get(0)).name() == RelationCategory.EDGE.name()
|| condition instanceof RelationTypeCondition && ((RelationTypeCondition) condition).getRelationType().isEdgeLabel()
&& !(((RelationTypeCondition) condition).getRelationType() instanceof BaseLabel)) {
Iterator<VertexProperty<Long>> proxyIter = vertex.properties("proxies");
while (proxyIter.hasNext()) {
proxyIds.add(proxyIter.next().value());
}
}
iter = executeIndividualRelations(vertex, baseQuery).iterator();
this.baseQuery = baseQuery;
}

@Override
protected JanusGraphRelation computeNext() {
if (iter.hasNext()) {
return iter.next();
} else if (offset < proxyIds.size()) {
InternalVertex proxyV = (InternalVertex) tx.getVertex(proxyIds.get(offset++));
iter = executeIndividualRelations(proxyV, baseQuery).iterator();
return computeNext();
}
return endOfData();
}
}

}
Loading

0 comments on commit aa9791f

Please sign in to comment.