Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions arango_rdf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1456,33 +1456,41 @@ def migrate_unknown_resources(
def migrate_edges_to_attributes(
self,
graph_name: str,
edge_collection_name: str,
edge_path: list[str],
attribute_name: Optional[str] = None,
edge_direction: str = "OUTBOUND",
max_depth: int = 1,
sort_clause: Optional[str] = None,
return_clause: Optional[str] = None,
filter_clause: Optional[str] = None,
traversal_options: Optional[dict[str, Any]] = None,
) -> int:
"""RDF --> ArangoDB (PGT): Migrate all edges in the specified edge collection to
attributes. This method is useful when combined with the
**resource_collection_name** parameter of the :func:`rdf_to_arangodb_by_pgt`
method.

NOTE: It is recommended to run this method with **edge_collection_name** set
to **"type"** after :func:`rdf_to_arangodb_by_pgt` if the user has set the
NOTE: It is recommended to run this method with **edge_path** set
to **["type"]** after :func:`rdf_to_arangodb_by_pgt` if the user has set the
**resource_collection_name** parameter.

:param graph_name: The name of the graph to migrate the edges from.
:type graph_name: str
:param edge_collection_name: The name of the edge collection to migrate.
:type edge_collection_name: str
:param edge_path: The path of the edges to migrate. The first element is the
starting edge collection, the last element is the ending edge collection.
Can also include edge direction traversal
(e.g ["OUTBOUND type", "OUTBOUND subClassOf"]).
:type edge_path: list[str]
:param edge_direction: The default traversal direction of the edges to migrate.
Defaults to **OUTBOUND**.
:type edge_direction: str
:param max_depth: The maximum depth of the edge path to migrate.
Defaults to 1.
:type max_depth: int
:param attribute_name: The name of the attribute to migrate the edges to.
Defaults to **edge_collection_name**, prefixed with the
Defaults to **edge_path[0]**, prefixed with the
**rdf_attribute_prefix** parameter set in the constructor.
:type attribute_name: Optional[str]
:param edge_direction: The direction of the edges to migrate.
Defaults to **OUTBOUND**.
:type edge_direction: str
:param sort_clause: A SORT statement to order the traversed vertices.
Defaults to f"v.{self.__rdf_attribute_prefix}label". If set to None,
the vertex values will be ordered based on their traversal order.
Expand All @@ -1495,6 +1503,9 @@ def migrate_edges_to_attributes(
:param filter_clause: A FILTER statement to filter the traversed
edges & target vertices. Defaults to None.
:type filter_clause: Optional[str]
:param traversal_options: A dictionary of traversal options to pass to the
AQL query. Defaults to None.
:type traversal_options: Optional[dict[str, Any]]
:return: The number of documents updated.
:rtype: int
"""
Expand All @@ -1507,35 +1518,51 @@ def migrate_edges_to_attributes(

graph = self.db.graph(graph_name)

target_e_d = {}
# Remove potential INBOUND/OUTBOUND/ANY prefix
# (e.g ["OUTBOUND type", "OUTBOUND subClassOf"])
edge_path_cleaned = [e_col.split(" ")[-1] for e_col in edge_path]
start_edge_collection = edge_path_cleaned[0]

start_node_collections = []
all_e_ds = []
for e_d in graph.edge_definitions():
if e_d["edge_collection"] == edge_collection_name:
target_e_d = e_d
break
if e_d["edge_collection"] == start_edge_collection:
start_node_collections = e_d["from_vertex_collections"]

if not target_e_d:
m = f"No edge definition found for '{edge_collection_name}' in graph '{graph_name}'. Cannot migrate edges to attributes." # noqa: E501
if e_d["edge_collection"] in edge_path_cleaned:
all_e_ds.append(e_d)

if not all_e_ds:
m = f"No edge definitions found for '{edge_path}' in graph '{graph_name}'. Cannot migrate edges to attributes." # noqa: E501
raise ValueError(m)

if not attribute_name:
attribute_name = f"{self.__rdf_attribute_prefix}{edge_collection_name}"
if attribute_name is None:
attribute_name = f"{self.__rdf_attribute_prefix}{start_edge_collection}"

if not sort_clause:
if sort_clause is None:
sort_clause = f"v.{self.__rdf_label_attr}"

if not return_clause:
if return_clause is None:
return_clause = f"v.{self.__rdf_label_attr}"

with_cols = set(target_e_d["to_vertex_collections"])
if traversal_options is None:
traversal_options = {
"uniqueVertices": "path",
"uniqueEdges": "path",
}

with_cols = {col for e_d in all_e_ds for col in e_d["to_vertex_collections"]}
with_cols_str = "WITH " + ", ".join(with_cols)
e_cols = ", ".join(edge_path_cleaned)

count = 0
for v_col in target_e_d["from_vertex_collections"]:
for v_col in start_node_collections:
query = f"""
{with_cols_str}
FOR doc IN @@v_col
LET labels = (
FOR v, e IN 1 {edge_direction} doc @@e_col
FOR v, e IN 1..{max_depth} {edge_direction} doc {e_cols}
OPTIONS {json.dumps(traversal_options)}
{f"FILTER {filter_clause}" if filter_clause else ""}
{f"SORT {sort_clause}" if sort_clause else ""}
RETURN {return_clause}
Expand All @@ -1544,9 +1571,7 @@ def migrate_edges_to_attributes(
UPDATE doc WITH {{{attribute_name}: labels}} IN @@v_col
"""

self.db.aql.execute(
query, bind_vars={"@v_col": v_col, "@e_col": edge_collection_name}
)
self.db.aql.execute(query, bind_vars={"@v_col": v_col})

count += self.db.collection(v_col).count()

Expand Down
25 changes: 25 additions & 0 deletions docs/rdf_to_arangodb_lpg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Consider the following RDF graph:
.. code-block:: turtle

@prefix ex: <http://example.com/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

ex:Alice a ex:Person ;
ex:name "Alice" ;
Expand All @@ -32,6 +33,8 @@ Consider the following RDF graph:

ex:Alice ex:friend ex:Bob .

ex:Person rdfs:subClassOf ex:Human .

Running the LPG transformation produces a graph with:

* **2 vertices** in the ``Node`` collection (``ex:Alice`` & ``ex:Bob``)
Expand Down Expand Up @@ -80,6 +83,28 @@ After the migration each vertex has an ``_type`` array property –
``["Person"]`` in this example – and the original ``rdf:type`` edges remain untouched.
Delete them if you do not need them any more.

In addition to the **edge_collection_name** parameter, it is possible to traverse the vertices of the 2nd Order edge collection to apply
the same attribute (but at the 2nd Order) to the original target verticies. In PGT, a common use case is to
set **edge_collection_name** to **"type"** and **second_order_edge_collection_name**
to **"subClassOf"** for inferring the **_type** attribute.

In LPG, this can be done with ``second_order_filter_clause``:

.. code-block:: python

adbrdf.migrate_edges_to_attributes(
graph_name="DemoGraph",
edge_collection_name="Edge",
attribute_name="_type",
filter_clause="e._label == 'type'",
second_order_edge_collection_name="Edge",
second_order_filter_clause="e._label == 'subClassOf'"
second_order_depth=10,
)

After this migration, the ``_type`` attribute of ``ex:Alice`` and ``ex:Bob`` will be adjusted to ``["Person", "Human"]``.


LPG Collection Mapping Process
==============================

Expand Down
142 changes: 127 additions & 15 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5442,7 +5442,7 @@ def test_pgt_resource_collection_name_and_set_types_attribute() -> None:
for node in db.collection("Node"):
assert "_type" not in node

count = adbrdf.migrate_edges_to_attributes("Test", "type")
count = adbrdf.migrate_edges_to_attributes("Test", ["type"])

node_col = db.collection("Node")
assert set(node_col.get(adbrdf.hash("http://example.com/Alice"))["_type"]) == {
Expand Down Expand Up @@ -5475,7 +5475,7 @@ def test_pgt_resource_collection_name_and_set_types_attribute() -> None:
for v in db.collection("Company"):
assert "_type" not in v

count = adbrdf.migrate_edges_to_attributes("Test", "type", "foo")
count = adbrdf.migrate_edges_to_attributes("Test", ["type"], "foo")
assert count == 3

for v in db.collection("Human"):
Expand All @@ -5484,9 +5484,7 @@ def test_pgt_resource_collection_name_and_set_types_attribute() -> None:
for v in db.collection("Company"):
assert set(v["foo"]) == {"Organization", "Company"}

count = adbrdf.migrate_edges_to_attributes(
graph_name="Test", edge_collection_name="friend"
)
count = adbrdf.migrate_edges_to_attributes(graph_name="Test", edge_path=["friend"])

alice = db.collection("Human").get(adbrdf.hash("http://example.com/Alice"))
assert alice["_friend"] == ["Bob"]
Expand All @@ -5497,7 +5495,7 @@ def test_pgt_resource_collection_name_and_set_types_attribute() -> None:
assert count == 2

count = adbrdf.migrate_edges_to_attributes(
graph_name="Test", edge_collection_name="friend", edge_direction="ANY"
graph_name="Test", edge_path=["friend"], edge_direction="ANY"
)

assert count == 2
Expand All @@ -5510,23 +5508,19 @@ def test_pgt_resource_collection_name_and_set_types_attribute() -> None:

with pytest.raises(ValueError) as e:
adbrdf.migrate_edges_to_attributes(
graph_name="Test", edge_collection_name="friend", edge_direction="INVALID"
graph_name="Test", edge_path=["friend"], edge_direction="INVALID"
)

assert "Invalid edge direction: INVALID" in str(e.value)

with pytest.raises(ValueError) as e:
adbrdf.migrate_edges_to_attributes(
graph_name="Test", edge_collection_name="INVALID"
)
adbrdf.migrate_edges_to_attributes(graph_name="Test", edge_path=["INVALID"])

m = "No edge definition found for 'INVALID' in graph 'Test'. Cannot migrate edges to attributes." # noqa: E501
m = "No edge definitions found for '['INVALID']' in graph 'Test'. Cannot migrate edges to attributes." # noqa: E501
assert m in str(e.value)

with pytest.raises(ValueError) as e:
adbrdf.migrate_edges_to_attributes(
graph_name="INVALID", edge_collection_name="friend"
)
adbrdf.migrate_edges_to_attributes(graph_name="INVALID", edge_path=["friend"])

assert "Graph 'INVALID' does not exist" in str(e.value)

Expand Down Expand Up @@ -5627,7 +5621,7 @@ def test_lpg() -> None:
assert "_type" not in node

adbrdf.migrate_edges_to_attributes(
"Test", "Edge", "_type", filter_clause="e._label == 'type'"
"Test", ["Edge"], "_type", filter_clause="e._label == 'type'"
)

for node in db.collection("Node"):
Expand Down Expand Up @@ -5702,3 +5696,121 @@ def import_rdf(graph_name: str, rdf_graph: RDFGraph) -> str:
assert db.collection("Node").count() == 3
assert db.collection("Property").count() == 2
assert db.collection("knows").count() == 2


def test_migrate_edges_to_attributes_max_depth() -> None:
db.delete_graph("Test", drop_collections=True, ignore_missing=True)

g = RDFGraph()
g.parse(
data="""
@prefix ex: <http://example.com/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

ex:Alice a ex:Human .

ex:Bob a ex:Person .

ex:Charlie a ex:Animal .

ex:Dana a ex:Entity .

ex:Eve a ex:Human .
ex:Eve a ex:Person .

ex:Fred a ex:Human .
ex:Fred a ex:Individual .

ex:Human rdfs:subClassOf ex:Animal .
ex:Person rdfs:subClassOf ex:Individual .
ex:Animal rdfs:subClassOf ex:Entity .
ex:Individual rdfs:subClassOf ex:Entity .
""",
format="turtle",
)

adbrdf.rdf_to_arangodb_by_pgt("Test", g, resource_collection_name="Node")

assert db.collection("subClassOf").count() == 4

adbrdf.migrate_edges_to_attributes(
graph_name="Test",
edge_path=["type", "subClassOf"],
max_depth=1,
)

alice = db.collection("Node").get(adbrdf.hash("http://example.com/Alice"))
assert set(alice["_type"]) == {"Human"}

bob = db.collection("Node").get(adbrdf.hash("http://example.com/Bob"))
assert set(bob["_type"]) == {"Person"}

charlie = db.collection("Node").get(adbrdf.hash("http://example.com/Charlie"))
assert set(charlie["_type"]) == {"Animal"}

dana = db.collection("Node").get(adbrdf.hash("http://example.com/Dana"))
assert set(dana["_type"]) == {"Entity"}

eve = db.collection("Node").get(adbrdf.hash("http://example.com/Eve"))
assert set(eve["_type"]) == {"Human", "Person"}

fred = db.collection("Node").get(adbrdf.hash("http://example.com/Fred"))
assert set(fred["_type"]) == {"Human", "Individual"}

db.delete_graph("Test", drop_collections=True)

adbrdf.rdf_to_arangodb_by_pgt("Test", g, resource_collection_name="Node")

adbrdf.migrate_edges_to_attributes(
graph_name="Test",
edge_path=["type", "subClassOf"],
max_depth=2,
)

alice = db.collection("Node").get(adbrdf.hash("http://example.com/Alice"))
assert set(alice["_type"]) == {"Human", "Animal"}

bob = db.collection("Node").get(adbrdf.hash("http://example.com/Bob"))
assert set(bob["_type"]) == {"Person", "Individual"}

charlie = db.collection("Node").get(adbrdf.hash("http://example.com/Charlie"))
assert set(charlie["_type"]) == {"Animal", "Entity"}

dana = db.collection("Node").get(adbrdf.hash("http://example.com/Dana"))
assert set(dana["_type"]) == {"Entity"}

eve = db.collection("Node").get(adbrdf.hash("http://example.com/Eve"))
assert set(eve["_type"]) == {"Human", "Person", "Animal", "Individual"}

fred = db.collection("Node").get(adbrdf.hash("http://example.com/Fred"))
assert set(fred["_type"]) == {"Human", "Individual", "Animal", "Entity"}

db.delete_graph("Test", drop_collections=True)

adbrdf.rdf_to_arangodb_by_pgt("Test", g, resource_collection_name="Node")

adbrdf.migrate_edges_to_attributes(
graph_name="Test",
edge_path=["type", "subClassOf"],
max_depth=3,
)

alice = db.collection("Node").get(adbrdf.hash("http://example.com/Alice"))
assert set(alice["_type"]) == {"Human", "Animal", "Entity"}

bob = db.collection("Node").get(adbrdf.hash("http://example.com/Bob"))
assert set(bob["_type"]) == {"Person", "Individual", "Entity"}

charlie = db.collection("Node").get(adbrdf.hash("http://example.com/Charlie"))
assert set(charlie["_type"]) == {"Animal", "Entity"}

dana = db.collection("Node").get(adbrdf.hash("http://example.com/Dana"))
assert set(dana["_type"]) == {"Entity"}

eve = db.collection("Node").get(adbrdf.hash("http://example.com/Eve"))
assert set(eve["_type"]) == {"Human", "Person", "Animal", "Individual", "Entity"}

fred = db.collection("Node").get(adbrdf.hash("http://example.com/Fred"))
assert set(fred["_type"]) == {"Human", "Individual", "Entity", "Animal"}

db.delete_graph("Test", drop_collections=True)
Loading