1
- from raphtory import DiskGraph , Query , State , PyDirection
1
+ from raphtory import PyDirection , DiskGraphStorage
2
+ from raphtory import algorithms
2
3
import pandas as pd
3
4
import tempfile
5
+ from utils import measure
6
+ import os
4
7
5
8
edges = pd .DataFrame (
6
9
{
33
36
34
37
35
38
def create_graph (edges , dir ):
36
- return DiskGraph .load_from_pandas (dir , edges , "src" , "dst" , "time" )
39
+ return DiskGraphStorage .load_from_pandas (dir , edges , "src" , "dst" , "time" )
37
40
38
41
39
42
# in every test use with to create a temporary directory that will be deleted automatically
@@ -42,113 +45,159 @@ def create_graph(edges, dir):
42
45
43
46
def test_counts ():
44
47
dir = tempfile .TemporaryDirectory ()
45
- graph = create_graph (edges , dir .name )
48
+ graph = create_graph (edges , dir .name ). to_events ()
46
49
assert graph .count_nodes () == 5
47
50
assert graph .count_edges () == 20
48
51
49
52
50
- def test_simple_hop ():
51
- dir = tempfile .TemporaryDirectory ()
52
- graph = create_graph (edges , dir .name )
53
- q = Query .from_node_ids ([1 ]).hop (dir = PyDirection ("OUT" ), layer = None , limit = 100 )
54
- state = State .path ()
55
- actual = q .run_to_vec (graph , state )
56
-
57
- actual = [([n2 .name , n1 .name ], n2 .name ) for ([n2 , n1 ], n2 ) in actual ]
58
-
59
- expected = [
60
- (["2" , "1" ], "2" ),
61
- (["3" , "1" ], "3" ),
62
- (["4" , "1" ], "4" ),
63
- (["5" , "1" ], "5" ),
64
- ]
65
-
66
- actual .sort ()
67
- expected .sort ()
68
-
69
- assert actual == expected
70
-
71
-
72
- def test_simple_hop_from_node ():
73
- dir = tempfile .TemporaryDirectory ()
74
- graph = create_graph (edges , dir .name )
75
- node = graph .node (1 )
76
- q = Query .from_node_ids ([node ]).out ()
77
- state = State .path ()
78
- actual = q .run_to_vec (graph , state )
79
-
80
- actual = [([n2 .name , n1 .name ], n2 .name ) for ([n2 , n1 ], n2 ) in actual ]
81
-
82
- expected = [
83
- (["2" , "1" ], "2" ),
84
- (["3" , "1" ], "3" ),
85
- (["4" , "1" ], "4" ),
86
- (["5" , "1" ], "5" ),
87
- ]
88
-
89
- actual .sort ()
90
- expected .sort ()
91
-
92
- assert actual == expected
53
+ def test_disk_graph ():
54
+ curr_dir = os .path .dirname (os .path .abspath (__file__ ))
55
+ rsc_dir = os .path .join (curr_dir , ".." , ".." , "pometry-storage-private" , "resources" )
56
+ rsc_dir = os .path .normpath (rsc_dir )
57
+ print ("rsc_dir:" , rsc_dir + "/netflowsorted/nft_sorted" )
93
58
94
-
95
- def test_double_hop ():
96
- dir = tempfile .TemporaryDirectory ()
97
- graph = create_graph (edges , dir .name )
98
- q = Query .from_node_ids ([1 ]).out ().out ()
99
- state = State .path ()
100
- actual = q .run_to_vec (graph , state )
101
-
102
- actual = [([n3 .name , n2 .name , n1 .name ], n3 .name ) for ([n3 , n2 , n1 ], n3 ) in actual ]
103
-
104
- expected = [
105
- (["1" , "5" , "1" ], "1" ),
106
- (["2" , "4" , "1" ], "2" ),
107
- (["5" , "3" , "1" ], "5" ),
108
- (["2" , "5" , "1" ], "2" ),
109
- (["4" , "2" , "1" ], "4" ),
110
- (["4" , "3" , "1" ], "4" ),
111
- (["1" , "4" , "1" ], "1" ),
112
- (["3" , "2" , "1" ], "3" ),
113
- (["3" , "4" , "1" ], "3" ),
114
- (["5" , "2" , "1" ], "5" ),
115
- (["1" , "2" , "1" ], "1" ),
116
- (["5" , "4" , "1" ], "5" ),
117
- (["2" , "3" , "1" ], "2" ),
118
- (["1" , "3" , "1" ], "1" ),
119
- (["3" , "5" , "1" ], "3" ),
120
- (["4" , "5" , "1" ], "4" ),
59
+ graph_dir = tempfile .TemporaryDirectory ()
60
+ layer_parquet_cols = [
61
+ {
62
+ "parquet_dir" : rsc_dir + "/netflowsorted/nft_sorted" ,
63
+ "layer" : "netflow" ,
64
+ "src_col" : "src" ,
65
+ "dst_col" : "dst" ,
66
+ "time_col" : "epoch_time" ,
67
+ },
68
+ {
69
+ "parquet_dir" : rsc_dir + "/netflowsorted/v1_sorted" ,
70
+ "layer" : "events_1v" ,
71
+ "src_col" : "src" ,
72
+ "dst_col" : "dst" ,
73
+ "time_col" : "epoch_time" ,
74
+ },
75
+ {
76
+ "parquet_dir" : rsc_dir + "/netflowsorted/v2_sorted" ,
77
+ "layer" : "events_2v" ,
78
+ "src_col" : "src" ,
79
+ "dst_col" : "dst" ,
80
+ "time_col" : "epoch_time" ,
81
+ },
121
82
]
122
83
123
- actual .sort ()
124
- expected .sort ()
125
-
126
- assert actual == expected
127
-
128
-
129
- def test_hop_twice_forward ():
130
- dir = tempfile .TemporaryDirectory ()
131
- edges = pd .DataFrame (
84
+ # # Read the Parquet file
85
+ # table = pq.read_table(parquet_dir + '/part-00000-8b31eaa4-2bd9-4f07-b61c-a353aed2af22-c000.snappy.parquet')
86
+ # print(table.schema)
87
+
88
+ print ()
89
+ try :
90
+ g = measure (
91
+ "Graph load from dir" ,
92
+ DiskGraphStorage .load_from_dir ,
93
+ graph_dir ,
94
+ print_result = False ,
95
+ )
96
+ except Exception as e :
97
+ chunk_size = 268_435_456
98
+ num_threads = 4
99
+ t_props_chunk_size = int (chunk_size / 8 )
100
+ read_chunk_size = 4_000_000
101
+ concurrent_files = 1
102
+
103
+ g = measure (
104
+ "Graph load from parquets" ,
105
+ DiskGraphStorage .load_from_parquets ,
106
+ graph_dir .name ,
107
+ layer_parquet_cols ,
108
+ None ,
109
+ chunk_size ,
110
+ t_props_chunk_size ,
111
+ read_chunk_size ,
112
+ concurrent_files ,
113
+ num_threads ,
114
+ None ,
115
+ print_result = False ,
116
+ )
117
+
118
+ g = g .to_events ()
119
+
120
+ assert g .count_nodes () == 1624
121
+ assert g .layer ("netflow" ).count_edges () == 2018
122
+ assert g .earliest_time == 7257601
123
+ assert g .latest_time == 7343985
124
+
125
+ actual = measure (
126
+ "Weakly CC Layer" ,
127
+ algorithms .weakly_connected_components ,
128
+ g .layer ("netflow" ),
129
+ 20 ,
130
+ print_result = False ,
131
+ )
132
+ assert len (list (actual .get_all_with_names ())) == 1624
133
+
134
+ # Doesn't work yet (was silently running on only the first layer before but now actually panics because of lack of multilayer edge views)
135
+ # actual = measure("Weakly CC", algorithms.weakly_connected_components, g, 20, print_result=False)
136
+ # assert len(list(actual.get_all_with_names())) == 1624
137
+
138
+ actual = measure (
139
+ "Page Rank" , algorithms .pagerank , g .layer ("netflow" ), 100 , print_result = False
140
+ )
141
+ assert len (list (actual .get_all_with_names ())) == 1624
142
+
143
+ def test_disk_graph_type_filter ():
144
+ curr_dir = os .path .dirname (os .path .abspath (__file__ ))
145
+ rsc_dir = os .path .join (curr_dir , ".." , ".." , "pometry-storage-private" , "resources" )
146
+ rsc_dir = os .path .normpath (rsc_dir )
147
+ print ("rsc_dir:" , rsc_dir + "/netflowsorted/nft_sorted" )
148
+
149
+ graph_dir = tempfile .TemporaryDirectory ()
150
+ layer_parquet_cols = [
132
151
{
133
- "src" : [0 , 0 , 1 , 1 , 3 , 3 , 3 , 4 , 4 , 4 ],
134
- "dst" : [1 , 2 , 3 , 4 , 5 , 6 , 6 , 3 , 4 , 7 ],
135
- "time" : [11 , 10 , 12 , 13 , 5 , 10 , 15 , 14 , 14 , 10 ],
152
+ "parquet_dir" : rsc_dir + "/netflowsorted/nft_sorted" ,
153
+ "layer" : "netflow" ,
154
+ "src_col" : "src" ,
155
+ "dst_col" : "dst" ,
156
+ "time_col" : "epoch_time" ,
136
157
}
137
- ).sort_values (["src" , "dst" , "time" ])
138
- graph = create_graph (edges , dir .name )
139
- q = Query .from_node_ids ([0 , 1 ]).out ().out ()
140
- state = State .path_window (keep_path = True , start_t = 10 , duration = 100 )
141
- actual = q .run_to_vec (graph , state )
142
-
143
- actual = [([n3 .name , n2 .name , n1 .name ], n3 .name ) for ([n3 , n2 , n1 ], n3 ) in actual ]
144
-
145
- expected = [
146
- (["6" , "3" , "1" ], "6" ),
147
- (["3" , "1" , "0" ], "3" ),
148
- (["3" , "4" , "1" ], "3" ),
149
- (["4" , "4" , "1" ], "4" ),
150
- (["4" , "1" , "0" ], "4" ),
151
158
]
152
- actual .sort ()
153
- expected .sort ()
154
- assert actual == expected
159
+
160
+ chunk_size = 268_435_456
161
+ num_threads = 4
162
+ t_props_chunk_size = int (chunk_size / 8 )
163
+ read_chunk_size = 4_000_000
164
+ concurrent_files = 1
165
+
166
+ g = DiskGraphStorage .load_from_parquets (
167
+ graph_dir .name ,
168
+ layer_parquet_cols ,
169
+ rsc_dir + "/netflowsorted/props/props.parquet" ,
170
+ chunk_size ,
171
+ t_props_chunk_size ,
172
+ read_chunk_size ,
173
+ concurrent_files ,
174
+ num_threads ,
175
+ "node_type"
176
+ ).to_events ()
177
+
178
+ assert g .count_nodes () == 1619
179
+ assert g .layer ("netflow" ).count_edges () == 2018
180
+ assert g .earliest_time == 7257619
181
+ assert g .latest_time == 7343970
182
+
183
+ assert len (g .nodes .type_filter (["A" ]).name .collect ()) == 785
184
+ assert len (g .nodes .type_filter (["" ]).name .collect ()) == 0
185
+ assert len (g .nodes .type_filter (["A" , "B" ]).name .collect ()) == 1619
186
+
187
+ neighbor_names = g .nodes .type_filter (["A" ]).neighbours .name .collect ()
188
+ total_length = sum (len (names ) for names in neighbor_names )
189
+ assert total_length == 2056
190
+
191
+ assert g .nodes .type_filter ([]).name .collect () == []
192
+
193
+ neighbor_names = g .nodes .type_filter (["A" ]).neighbours .type_filter (["B" ]).name .collect ()
194
+ total_length = sum (len (names ) for names in neighbor_names )
195
+ assert total_length == 1023
196
+
197
+ assert g .node ("Comp175846" ).neighbours .type_filter (["A" ]).name .collect () == ["Comp844043" ]
198
+ assert g .node ("Comp175846" ).neighbours .type_filter (["B" ]).name .collect () == []
199
+ assert g .node ("Comp175846" ).neighbours .type_filter ([]).name .collect () == []
200
+ assert g .node ("Comp175846" ).neighbours .type_filter (["A" , "B" ]).name .collect () == ["Comp844043" ]
201
+
202
+ neighbor_names = g .node ("Comp175846" ).neighbours .neighbours .name .collect ()
203
+ assert len (neighbor_names ) == 193
0 commit comments