Skip to content

Commit

Permalink
Refactor how the stream is provided to the driver for copy from stdin
Browse files Browse the repository at this point in the history
  • Loading branch information
DMickens committed Jan 26, 2024
1 parent 374183d commit 3299b07
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion packages/vertica-nodejs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ Copy from local stdin in vertica-nodejs can be better described as copy from loc
const readableStream = fs.createReadStream(filePath) // assumes filePath is a string containing the path to a data file
client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => {
if (err) console.log(err)
client.query("COPY myTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => {
client.query({text: "COPY myTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
console.log(err || res.getRejectedRows())
client.end()
})
Expand Down
15 changes: 6 additions & 9 deletions packages/vertica-nodejs/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ class Query extends EventEmitter {
super()

config = utils.normalizeQueryConfig(config, values, callback)

this.text = config.text
this.values = config.values
this.rows = config.rows
this.types = config.types
this.name = config.name
this.binary = config.binary || false
this.copyStream = config.copyStream || null
// use unique portal name each time
this.portal = config.portal || ''
this.callback = config.callback
Expand All @@ -50,9 +50,6 @@ class Query extends EventEmitter {
this._canceledDueToError = false
this._activeError = false
this._promise = null
if (this.values) {
this.copyStream = this.values.copyStream || null
}
}

requiresPreparation() {
Expand All @@ -70,7 +67,7 @@ class Query extends EventEmitter {
return false
}
// prepare if there are values
if (!this.values || !Array.isArray(this.values)) {
if (!this.values) {
return false
}
return this.values.length > 0
Expand Down Expand Up @@ -181,10 +178,10 @@ class Query extends EventEmitter {
if (this.text && previous && this.text !== previous) {
return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
}
if (this.values && !Array.isArray(this.values)) {
return new Error('Query values must be an array')
}
if (this.requiresPreparation()) {
if (this.values && !Array.isArray(this.values)) {
return new Error('Query values must be an array')
}
this.prepare(connection)
} else {
connection.query(this.text)
Expand All @@ -201,7 +198,7 @@ class Query extends EventEmitter {
}

handleEndOfBatchResponse(connection) {
if (this.values && this.values.copyStream) { //copy from stdin
if (this.copyStream) { //copy from stdin
connection.sendCopyDone()
}
// else noop, backend will send CopyDoneResponse for copy from local file to continue the process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('Running Copy From Local Stdin Commands', function () {
it ('succeeds with basic copy from stdin command', function(done) {
const readableStream = fs.createReadStream(goodFilePath, { encoding: 'utf8' })
readableStream.on('open', () => {
pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => {
pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 5)
done()
Expand All @@ -55,7 +55,7 @@ describe('Running Copy From Local Stdin Commands', function () {
it ('succeeds with a binary input stream', function(done) {
const readableStream = fs.createReadStream(goodFilePath)
readableStream.on('open', () => {
pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => {
pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 5)
done()
Expand All @@ -78,7 +78,7 @@ describe('Running Copy From Local Stdin Commands', function () {
writableStream.end(() => {
const readableStream = fs.createReadStream(largeFilePath, { encoding: 'utf8' })
readableStream.on('open', () => {
pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => {
pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
try {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], requiredLines)
Expand All @@ -94,7 +94,7 @@ describe('Running Copy From Local Stdin Commands', function () {
it('returns rejected rows with RETURNREJECTED specified', function(done) {
const readableStream = fs.createReadStream(badFilePath, { encoding: 'utf8' })
readableStream.on('open', () => {
pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => {
pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => {
assert.equal(err, undefined)
assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents
assert.deepEqual(res.getRejectedRows(), [2, 4]) // rows 2 and 4 are malformed
Expand All @@ -105,7 +105,7 @@ describe('Running Copy From Local Stdin Commands', function () {

it('behaves properly when input stream does not exist/is invalid', function(done) {
const badStream = null
pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: badStream}, (err) => {
pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: badStream}, (err) => {
assert.ok(err.message.includes("Cannot perform copy operation. Stream must be an instance of stream.Readable"))
done()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ var createErorrClient = function () {

const suite = new helper.Suite('error handling')

suite.test('sending non-array argument as values causes an error callback', (done) => {
const client = new Client()
client.connect((err) => {
if (err) {
return done(err)
}
client.query('select ?::varchar as name', 'foo', (err) => {
assert(err.message.includes("Query values must be an array"))
client.query('SELECT ?::varchar as name', ['foo'], (err, res) => {
assert(!err)
assert.equal(res.rows[0].name, 'foo')
client.end(done)
})
})
})
})

suite.test('re-using connections results in error callback', (done) => {
const client = new Client()
client.connect((err) => {
Expand Down

0 comments on commit 3299b07

Please sign in to comment.