forked from kedro-org/kedro-viz
-
Notifications
You must be signed in to change notification settings - Fork 0
/
normalize-data.js
338 lines (310 loc) · 8.85 KB
/
normalize-data.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import { params } from '../config';
import {
arrayToObject,
prettifyName,
stripNamespace,
prettifyModularPipelineNames,
} from '../utils';
/**
* Create new default pipeline state instance
* @return {Object} state
*/
export const createInitialPipelineState = () => ({
pipeline: {
ids: [],
name: {},
},
modularPipeline: {
ids: [],
tree: {},
visible: {},
expanded: [],
active: {},
disabled: {},
},
node: {
ids: [],
name: {},
fullName: {},
type: {},
tags: {},
layer: {},
disabled: {},
pipelines: {},
clicked: null,
hovered: null,
fetched: {},
code: {},
parameters: {},
filepath: {},
inputs: {},
outputs: {},
plot: {},
image: {},
trackingData: {},
datasetType: {},
originalType: {},
transcodedTypes: {},
runCommand: {},
modularPipelines: {},
},
nodeType: {
ids: ['task', 'data', 'parameters', 'modularPipeline'],
name: {
data: 'Datasets',
task: 'Nodes',
parameters: 'Parameters',
modularPipeline: 'Modular Pipelines',
},
disabled: {
parameters: true,
task: false,
data: false,
},
},
edge: {
ids: [],
sources: {},
targets: {},
},
layer: {
ids: [],
name: {},
visible: true,
},
tag: {
ids: [],
name: {},
active: {},
enabled: {},
},
slice: {
from: null,
to: null,
apply: false,
},
hoveredParameters: false,
hoveredFocusMode: false,
});
/**
* Check whether data is in expected format
* @param {Object} data - The parsed data input
* @return {Boolean} True if valid for formatting
*/
const validateInput = (data) => {
if (!data) {
throw new Error('No data provided to Kedro-Viz');
}
if (data === 'json') {
// Data is still loading
return false;
}
if (!Array.isArray(data.edges) || !Array.isArray(data.nodes)) {
if (typeof jest === 'undefined') {
console.error('Invalid Kedro-Viz data:', data);
}
throw new Error(
'Invalid Kedro-Viz data input. Please ensure that your pipeline data includes arrays of nodes and edges'
);
}
return true;
};
/**
* Get unique, reproducible ID for each edge, based on its nodes
* @param {Object} source - Name and type of the source node
* @param {Object} target - Name and type of the target node
*/
const createEdgeID = (source, target) => [source, target].join('|');
/**
* Add a new pipeline
* @param {String} pipeline.id - Unique ID
* @param {String} pipeline.name - Pipeline name
*/
const addPipeline = (state) => (pipeline) => {
const { id } = pipeline;
if (state.pipeline.name[id]) {
return;
}
state.pipeline.ids.push(id);
state.pipeline.name[id] = prettifyName(pipeline.name || '');
};
/**
* Add a new node if it doesn't already exist
* @param {String} name - Default node name
* @param {String} type - 'data' or 'task'
* @param {Array} tags - List of associated tags
*/
const addNode = (state) => (node) => {
const { id } = node;
if (state.node.name[id]) {
return;
}
state.node.ids.push(id);
state.node.name[id] = prettifyName(stripNamespace(node.name || ''));
state.node.fullName[id] = node.name;
state.node.type[id] = node.type;
state.node.layer[id] = node.layer;
state.node.pipelines[id] = node.pipelines
? arrayToObject(node.pipelines, () => true)
: {};
state.node.tags[id] = node.tags || [];
// supports for metadata in case it exists on initial load
state.node.code[id] = node.code;
state.node.parameters[id] = node.parameters;
state.node.filepath[id] = node.filepath;
state.node.datasetType[id] = node.dataset_type;
state.node.originalType[id] = node.original_type;
state.node.transcodedTypes[id] = node.transcoded_types;
state.node.runCommand[id] = node.runCommand;
state.node.modularPipelines[id] = node.modular_pipelines || [];
};
/**
* Create a new link between two nodes and add it to the edges array
* @param {Object} source - Parent node
* @param {Object} target - Child node
*/
const addEdge =
(state) =>
({ source, target }) => {
const id = createEdgeID(source, target);
if (state.edge.ids.includes(id)) {
return;
}
state.edge.ids.push(id);
state.edge.sources[id] = source;
state.edge.targets[id] = target;
};
/**
* Add a new Tag if it doesn't already exist
* @param {Object} tag - Tag object
*/
const addTag = (state) => (tag) => {
const { id } = tag;
state.tag.ids.push(id);
state.tag.name[id] = prettifyName(tag.name || '');
};
/**
* Add a new Layer if it doesn't already exist
* @param {Object} layer - Layer object
*/
const addLayer = (state) => (layer) => {
// using layer name as both layerId and name.
// It futureproofs it if we need a separate layer ID in the future.
state.layer.ids.push(layer);
state.layer.name[layer] = layer;
};
/**
* Split query params from URL into an array and remove any empty strings
* @param {String} queryParams - Query params from URL
*/
const splitQueryParams = (queryParams) =>
queryParams ? queryParams.split(',').filter((item) => item !== '') : [];
/**
* Returns an object with filters for tags as set in current URL
* @param {Object} state - State object
* @param {Array} tagsQueryParam - List of node tags from URL
* @param {Array} allNodeTags - List of all associated tags
*/
const getNodeTagsFiltersFromUrl = (state, tagsQueryParam, allNodeTags = []) => {
const queryParamsTagsArray = splitQueryParams(tagsQueryParam);
if (queryParamsTagsArray.length !== 0) {
const queryParamsTagsSet = new Set(queryParamsTagsArray);
const enabledTags = allNodeTags.reduce((result, tag) => {
result[tag.id] = queryParamsTagsSet.has(tag.id);
return result;
}, {});
state.tag.enabled = enabledTags;
}
return state;
};
/**
* Updates the disabled state of node types based on the provided type query parameters.
* @param {Object} state - The current state object.
* @param {string} typeQueryParams - The type query parameters.
* @returns {Object} - The updated state object.
*/
const getNodeTypesFromUrl = (state, typeQueryParams) => {
const nodeTypes = splitQueryParams(typeQueryParams);
if (nodeTypes.length !== 0) {
Object.keys(state.nodeType.disabled).forEach((key) => {
state.nodeType.disabled[key] = !nodeTypes.includes(key);
});
}
return state;
};
/**
* Updates the state with filters from the URL.
* @param {Object} state - State object
* @param {Array} NodeTags - List of all associated tags
* * @returns {Object} - The updated state object.
*/
const updateStateWithFilters = (state, NodeTags) => {
const search = new URLSearchParams(window.location.search);
const typeQueryParams = search.get(params.types);
const tagQueryParams = search.get(params.tags);
const updatedStateWithTags = getNodeTagsFiltersFromUrl(
state,
tagQueryParams,
NodeTags
);
const updatedStateWithTypes = getNodeTypesFromUrl(state, typeQueryParams);
return { ...state, ...updatedStateWithTags, ...updatedStateWithTypes };
};
/**
* Convert the pipeline data into a normalized state object
* @param {Object} data Raw unformatted data input
* @return {Object} Formatted, normalized state
*/
const normalizeData = (data, expandAllPipelines) => {
const state = createInitialPipelineState();
if (data === 'json') {
state.dataSource = 'json';
} else if (data.source) {
state.dataSource = data.source;
}
if (!validateInput(data)) {
return state;
}
data.nodes.forEach(addNode(state));
data.edges.forEach(addEdge(state));
if (data.pipelines) {
data.pipelines.forEach(addPipeline(state));
if (state.pipeline.ids.length) {
state.pipeline.main = data.selected_pipeline || state.pipeline.ids[0];
state.pipeline.active = state.pipeline.main;
}
}
if (data.modular_pipelines) {
state.modularPipeline.ids = Object.keys(data.modular_pipelines);
state.modularPipeline.tree = prettifyModularPipelineNames(
data.modular_pipelines
);
// Case for expandAllPipelines in component props or within state
if (expandAllPipelines) {
// assign all modular pipelines into expanded state
state.modularPipeline.expanded = state.modularPipeline.ids;
// assign all nodes as visible nodes in modular pipelines
const nodeIds = state.node.ids;
nodeIds.forEach((nodeId) => {
if (!state.modularPipeline.ids.includes(nodeId)) {
state.modularPipeline.visible[nodeId] = true;
}
});
} else {
if (data.modular_pipelines && data.modular_pipelines['__root__']) {
for (const child of data.modular_pipelines['__root__'].children || []) {
state.modularPipeline.visible[child.id] = true;
}
}
}
}
if (data.tags) {
data.tags.forEach(addTag(state));
}
if (data.layers) {
data.layers.forEach(addLayer(state));
}
const updatedState = updateStateWithFilters(state, data.tags);
return updatedState;
};
export default normalizeData;