Skip to content

Commit

Permalink
Updates for supporting glob patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
DMickens committed Jan 26, 2024
1 parent 49bf90c commit 374183d
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
35 changes: 23 additions & 12 deletions packages/vertica-nodejs/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
var net = require('net')
var fs = require('fs')
var EventEmitter = require('events').EventEmitter
const glob = require('glob')

const { parse, serialize } = require('v-protocol')

Expand Down Expand Up @@ -330,19 +331,29 @@ class Connection extends EventEmitter {
})
}

sendCopyDataFile(msg) {
sendCopyDataFiles(msg) {
const buffer = Buffer.alloc(bufferSize);
const fd = fs.openSync(msg.fileName, 'r');
let bytesRead = 0;
do {
// read bufferSize bytes from the file into our buffer starting at the current position in the file
bytesRead = fs.readSync(fd, buffer, 0, bufferSize, null);
if (bytesRead > 0) {
// Process the chunk (buffer.slice(0, bytesRead)) here
this.sendCopyData(buffer.subarray(0, bytesRead))
}
} while (bytesRead > 0);
fs.closeSync(fd);
let expandedFileNames = []
if (/[*?[\]]/.test(msg.fileName)) { // contains glob pattern
const matchingFiles = glob.sync(msg.fileName)
expandedFileNames = expandedFileNames.concat(matchingFiles)
} else {
expandedFileNames.push(msg.fileName)
}
const uniqueFileNames = [...new Set(expandedFileNames)] // remove duplicates
for (const fileName of uniqueFileNames) {
const fd = fs.openSync(fileName, 'r');
let bytesRead = 0;
do {
// read bufferSize bytes from the file into our buffer starting at the current position in the file
bytesRead = fs.readSync(fd, buffer, 0, bufferSize, null);
if (bytesRead > 0) {
// Process the chunk (buffer.slice(0, bytesRead)) here
this.sendCopyData(buffer.subarray(0, bytesRead))
}
} while (bytesRead > 0);
fs.closeSync(fd);
}
this.sendEndOfBatchRequest()
}

Expand Down
25 changes: 19 additions & 6 deletions packages/vertica-nodejs/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const utils = require('./utils')
const fs = require('fs')
const fsPromises = require('fs').promises
const stream = require('stream')
const glob = require('glob')

class Query extends EventEmitter {
constructor(config, values, callback) {
Expand Down Expand Up @@ -265,11 +266,23 @@ class Query extends EventEmitter {

async handleVerifyFiles(msg, connection) {
if (msg.numFiles !== 0) { // we are copying from file, not stdin
try { // Check if the data file can be read
await fsPromises.access(msg.files[0], fs.constants.R_OK);
} catch (readInputFileErr) { // Can't open input file for reading, send CopyError
connection.sendCopyError(msg.files[0], 0, '', "Unable to open input file for reading")
return;
let expandedFileNames = []
for (const fileName of msg.files) {
if (/[*?[\]]/.test(fileName)) { // contains glob pattern
const matchingFiles = glob.sync(fileName)
expandedFileNames = expandedFileNames.concat(matchingFiles)
} else {
expandedFileNames.push(fileName)
}
}
const uniqueFileNames = [...new Set(expandedFileNames)] // remove duplicates
for (const fileName of uniqueFileNames) {
try { // Check if the data file can be read
await fsPromises.access(fileName, fs.constants.R_OK);
} catch (readInputFileErr) { // Can't open input file for reading, send CopyError
connection.sendCopyError(fileName, 0, '', "Unable to open input file for reading")
return;
}
}
} else { // check to make sure the readableStream is in fact a readableStream
if (!(this.copyStream instanceof stream.Readable)) {
Expand Down Expand Up @@ -317,7 +330,7 @@ class Query extends EventEmitter {
}

handleLoadFile(msg, connection) {
connection.sendCopyDataFile(msg)
connection.sendCopyDataFiles(msg)
}

handleWriteFile(msg, connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ describe('Running Copy From Local File Commands', function () {
const badFileName = "copy-bad.dat"
const goodFilePath = path.join(process.cwd(), goodFileName);
const badFilePath = path.join(process.cwd(), badFileName)
const goodFileContents = "1|'a'\n2|'b'\n3|'c'\n4|'d'\n5|'e'" // 5 correctly formatted rows
const badFileContents = "1|'a'\n'b'|2\n3|'c'\n'd'|4\n5|'e'" // rows 2 and 4 malformed
const goodFileContents = "1|a\n2|b\n3|c\n4|d\n5|e\n" // 5 correctly formatted rows
const badFileContents = "6|f\ng|7\n8|h\ni|9\n10|j\n" // rows 2 and 4 malformed

// generate temporary test files, create table before tests begin
before((done) => {
Expand Down Expand Up @@ -66,7 +66,7 @@ describe('Running Copy From Local File Commands', function () {
assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents
fs.readFile('rejects.txt', 'utf8', (err, data) => {
assert.equal(err, undefined)
assert.equal(data, "'b'|2\n'd'|4\n") // rows 2 and 4 are malformed
assert.equal(data, "g|7\ni|9\n") // rows 2 and 4 are malformed
})
} finally {
fs.unlink('rejects.txt', done)
Expand Down Expand Up @@ -186,10 +186,6 @@ describe('Running Copy From Local File Commands', function () {

})

it('succeeds using glob patterns', function(done) {
done()
})

it('succeeds with multiple input files', function(done) {
pool.query("COPY copyTable FROM LOCAL 'copy-good.dat', 'copy-bad.dat' RETURNREJECTED", (err, res) => {
assert.equal(err, undefined)
Expand All @@ -198,4 +194,16 @@ describe('Running Copy From Local File Commands', function () {
done()
})
})

it('succeeds using glob patterns', function(done) {
pool.query("COPY copyTable FROM LOCAL 'copy-*.dat' RETURNREJECTED", (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 8) // 5 good rows in goodFileContents
assert.equal(res.getRejectedRows().length, 2) // check the length instead of position in case the order of files loaded changes
pool.query({text: "SELECT num FROM copyTable ORDER BY num ASC", rowMode: 'array'}, (err, res) => {
assert.deepEqual(res.rows, [[1],[2],[3],[4],[5],[6],[8],[10]]) // 7 and 9 malformed.
done()
})
})
})
})

0 comments on commit 374183d

Please sign in to comment.