-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_arangoimp_multi.js
95 lines (81 loc) · 2.85 KB
/
stream_arangoimp_multi.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
console.time("exchange");
var mysql = require('mysql');
var arango = require('arango');
var child_process = require('child_process');
var async = require("async");
var json_transform = require('./json_transform_stream');
var source = "_import_mysql.fa_stammdaten";
var num_jobs = 4;
var mysql_credentials = {
host: 'localhost',
user: 'root',
password: ''
}
var args = [
"--server.database", "amtub",
"--collection", "fa_stammdaten_stream_arangoimp",
"--create-collection", "true",
"--progress", "true", // doesn't print anything when streaming file
"--type", "json",
"--file", "-"
]
// set path to arangodb binaries here
process.chdir("D:/Webserver/arangodb/bin");
function getRecordCount(callback) {
var offsets = [];
var row_count;
var count;
var connection = mysql.createConnection(mysql_credentials);
//connection.connect(...);
connection.query('SELECT COUNT(*) AS count FROM ' + source, function(err, rows, fields) {
if (err) {
console.log("Error", err);
process.exit();
}
row_count = rows[0].count;
console.log("Total rows:", row_count);
count = row_count / num_jobs | 0;
for (var o = 0; o < num_jobs; o++) {
// select all remaining rows by using the (almost) highest number
// http://stackoverflow.com/questions/255517/mysql-offset-infinite-rows
offsets.push("" + (count * o) + ',' + ((o == num_jobs - 1) ? Math.pow(2,63) : count));
}
connection.end();
callback(offsets);
});
}
function startMigration(offsets) {
async.each(offsets,
function(offset, callback){
console.log("Migrating", offset);
migrate(offset, callback);
},
function(err){
if (err) console.log("Error:", err);
else console.log("\nAll done.");
console.timeEnd("exchange");
process.exit();
}
);
}
function migrate(offset, callback) {
//console.log("Connecting to ArangoDB");
var db = arango.Connection("http://localhost:8529/amtub");
//console.log("Connecting to MySQL");
var connection = mysql.createConnection(mysql_credentials);
//connection.connect(...);
// if multiple processes pipe to main process, console log will be messed up, thus ignoring
var arangoimp = child_process.spawn("arangoimp", args, {stdio: ['pipe', 'ignore', 'ignore']});
arangoimp
.on('exit', function(code, signal) {
console.log("\nexit", code, signal);
connection.end(); // close MySQL connection
callback();
});
var encoder = new json_transform.stream();
var query = connection.query('SELECT * FROM ' + source + ' LIMIT ' + offset);
var query_stream = query.stream();
query_stream.pipe(encoder).pipe(arangoimp.stdin);
}
// Run
getRecordCount(startMigration);