-
Notifications
You must be signed in to change notification settings - Fork 139
/
server.js
290 lines (256 loc) · 9.89 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/**
* server - the main "air" server application.
*
* This module starts the database, scraper, and api components. It also processes the scraped data and
* sets up periodic scrapes. Could probably be much cleaner...
*/
"use strict";
console.log("============================================================");
console.log(new Date().toISOString() + " - Starting");
var util = require("util");
var _ = require("underscore");
var when = require("when");
var db = require("./db");
var scraper = require("./scraper");
var tool = require("./tool");
var schema = require("./schema");
var api = require("./api");
var stationsData = require("./station-data");
var log = tool.log();
var iconvShiftJIStoUTF8 = new (require("iconv")).Iconv("SHIFT_JIS", "UTF-8//TRANSLIT//IGNORE");
var shiftJIStoUTF8 = iconvShiftJIStoUTF8.convert.bind(iconvShiftJIStoUTF8);
var scrapeURL = process.argv[4];
var stationNames = {};
function extractP160DateTime(dom) {
var parts = scraper.matchText(/−(.*)年(.*)月(.*)日(.*)時.*−/, dom)[0];
return tool.toISOString({year: parts[1], month: parts[2], day: parts[3], hour: parts[4], zone: "+09:00"});
}
function cardinalToDegrees(s) {
switch (s) {
case "N": return 0;
case "NNE": return 22.5;
case "NE": return 45;
case "ENE": return 67.5;
case "E": return 90;
case "ESE": return 112.5;
case "SE": return 135;
case "SSE": return 157.5;
case "S": return 180;
case "SSW": return 202.5;
case "SW": return 225;
case "WSW": return 247.5;
case "W": return 270;
case "WNW": return 292.5;
case "NW": return 315;
case "NNW": return 337.5;
case "C": return 360; // calm; map to 360 to distinguish from 0 (N) and null (no sample)
default: return s;
}
}
function addTag(target, tag, value) {
value = (value || "").trim();
if (value.length > 0 && value !== "-" && value !== " ") {
var scale = 1;
switch (tag) {
case "temp": scale = 0.1; break; // 0.1 deg C -> deg C
case "hum": scale = 0.1; break; // 0.1% -> 1%
case "wv":
value = (value == "C" ? 0 : value);
scale = 0.1; // 0.1 m/s -> 1 m/s
break;
case "wd":
value = cardinalToDegrees(value);
break;
case "in": scale = 0.01; break; // 0.01 MJ/m2 -> MJ/m2
case "no": scale = 0.001; break; // mm3/m3 -> cm3/m3
case "no2": scale = 0.001; break; // mm3/m3 -> cm3/m3
case "nox": scale = 0.001; break; // mm3/m3 -> cm3/m3
case "ox": scale = 0.001; break; // mm3/m3 -> cm3/m3
case "so2": scale = 0.001; break; // mm3/m3 -> cm3/m3
case "co": scale = 0.1; break; // 0.1 cm3/m3 -> cm3/m3
case "ch4": scale = 0.01; break; // 10 mm3/m3 -> cm3/m3
case "nmhc": scale = 0.01; break; // 10 mm3/m3 -> cm3/m3
case "spm": break; // μg/m3
case "pm25": break; // μg/m3
}
target[tag] = value * scale;
}
return target;
}
function validateP160Header(header) {
var expected =
"番号,局番,局名,SO2,ppb,Ox,ppb,NO,ppb,NO2,ppb,NOx,ppb,CO,0.1ppm,SPM,μg/m3,NMHC," +
"pphmC,CH4,pphmC,PM2.5,μg/m3,風向,風速,0.1m/s,気温,0.1度,湿度,0.1%,日射量,0.01MJ/m2";
var actual = "" + header;
if (actual !== expected) {
throw "Expected P160 header: \n " + expected + "\nbut found:\n " + actual;
}
}
function processP160Row(row, date, counts) {
/* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
番号 局番 局名 SO2 Ox NO NO2 NOx CO SPM NMHC CH4 PM2.5 風向 風速 気温 湿度 日射量 */
var stationId = row[1] * 1;
var stationName = row[2];
stationNames[stationName] = stationId;
var item = { stationId: stationId, stationName: stationName, date: date };
addTag(item, "so2", row[3]);
addTag(item, "ox", row[4]);
addTag(item, "no", row[5]);
addTag(item, "no2", row[6]);
addTag(item, "nox", row[7]);
addTag(item, "co", row[8]);
addTag(item, "spm", row[9]);
addTag(item, "nmhc", row[10]);
addTag(item, "ch4", row[11]);
addTag(item, "pm25", row[12]);
addTag(item, "wd", row[13]);
addTag(item, "wv", row[14]);
addTag(item, "temp", row[15]);
addTag(item, "hum", row[16]);
addTag(item, "in", row[17]);
if (item.hasOwnProperty("wd") && item.hasOwnProperty("wv")) {
// Increment each time we find a row with wind data.
counts.windData += 1;
}
return db.upsert(schema.samples, item);
}
function processP160(dom) {
log.info("Processing P160...");
var tables = scraper.tablesOf(dom);
if (tables.length < 4) {
log.error("no data found");
return null;
}
var header = scraper.extract(tables[2])[0]; // table at index two is the header
validateP160Header(header);
var date = extractP160DateTime(dom);
var rows = scraper.extract(tables[3]); // table at index three is the data
var counts = {windData: 0};
var statements = rows.map(function(row) { return processP160Row(row, date, counts); });
if (counts.windData < 5) {
log.error("insufficient wind data found:" + counts); // the table we're scraping is probably incomplete
return null;
}
return statements;
}
function start() {
log.info("Preparing tables...");
return persist([db.createTable(schema.stations), db.createTable(schema.samples)]);
}
function scrapeP160(page, date) {
date = date ? Math.floor(date.getTime() / 1000) : "";
var url = tool.format("{0}/p160.cgi?no2=={1}={2}==2====2=", scrapeURL, date, page);
return scraper.fetch(url, shiftJIStoUTF8);
}
function persist(statements) {
if (!statements) {
return when.resolve(null);
}
log.info("Persisting...");
return db.executeAll(statements);
}
function doP160Page(page, date) {
return scrapeP160(page, date)
.then(processP160)
.then(persist);
}
function doP160(date) {
// return a promise for a boolean which is false if data was processed, and true if data was not available
// i.e., true == we are done.
// CONSIDER: This function's behavior is subtle and confusing. Improve.
var promises = [doP160Page(1, date), doP160Page(2, date)];
return when.reduce(
promises,
function(current, value) {
return current && !value;
},
true);
}
function pollP160ForUpdates() {
// Return a promise for a boolean which is true if new data was found. New data is found if the database
// reports that rows have been inserted or updated after scraping both pages.
// CONSIDER: This function's behavior is subtle and confusing. Improve.
var promises = [doP160Page(1), doP160Page(2)];
function sumRowCounts(current, value) {
if (value) {
value.forEach(function(result) {
current += result.rowCount; // abstraction leakage -- relying on rowCount to exist
});
}
return current;
}
return when.reduce(promises, sumRowCounts, 0).then(
function(rowsInserted) {
log.info("results of poll: rowsInserted = " + rowsInserted);
// Expect at least 60 samples, otherwise scrape not successful. Ugh.
var foundNewData = rowsInserted >= 60;
if (foundNewData) {
log.info("resetting query memos");
api.resetQueryMemos();
}
return foundNewData;
});
}
function doStationDetails() {
log.info("Preparing station details...");
var statements = [];
_.keys(stationNames).forEach(function(name) {
statements.push(db.upsert(schema.stations, {id: stationNames[name], name: name}));
});
stationsData.forEach(function(station) {
var row = {
id: station[0],
name: station[1],
address: station[2],
latitude: station[3],
longitude: station[4]
};
statements.push(db.upsert(schema.stations, row));
});
return persist(statements);
}
function doP160Historical(hours) {
log.info("Starting P160 Historical...");
var now = new Date().getTime();
var dates = [];
for (var i = 1; i <= hours; i++) {
dates.push(new Date(now - (i * 60 * 60 * 1000)));
}
function wait(x) {
var d = when.defer();
setTimeout(function() { d.resolve(x); }, 3000);
return d.promise;
}
return function doAnotherDate(done) {
if (dates.length > 0 && !done) {
var date = dates.shift();
log.info(tool.format("Processing {0}... (remaining: {1})", date, dates.length));
return doP160(date).then(wait).then(doAnotherDate);
}
else {
log.info("Finished P160 Historical");
}
}(false);
}
/**
* Look for new air data every hour.
*/
function pollForUpdates() {
var ONE_SECOND = 1000;
var ONE_MINUTE = 60 * ONE_SECOND;
var ONE_HOUR = 60 * ONE_MINUTE;
// Wait an exponentially longer amount of time after each retry, up to 8 min.
function exponentialBackoff(t) {
return Math.min(Math.pow(2, t < 0 ? -(t + 1) : t), 8) * ONE_MINUTE;
}
// The air data is updated every hour, but we don't know exactly when. By specifying initialRetry = -1,
// the pages get scraped a little earlier than the estimated time. Eventually, the algorithm will center
// itself on the actual time, even if it varies a bit.
tool.setFlexInterval(pollP160ForUpdates, ONE_MINUTE, ONE_HOUR, exponentialBackoff, -1);
}
start()
.then(doP160.bind(undefined, null))
.then(doStationDetails)
.then(pollForUpdates)
.then(doP160Historical.bind(undefined, 0/*9 * 24*/)) // up to nine days of historical data available
.then(null, function(e) { log.error(e.stack); });