From 3299b071a208f100d861ecd5860162bae1225558 Mon Sep 17 00:00:00 2001 From: Danny Mickens Date: Fri, 26 Jan 2024 11:52:45 -0500 Subject: [PATCH] Refactor how the stream is provided to the driver for copy from stdin --- packages/vertica-nodejs/README.md | 2 +- packages/vertica-nodejs/lib/query.js | 15 ++++++--------- .../client/copy-local-stdin-tests.js | 10 +++++----- .../integration/client/error-handling-tests.js | 17 +++++++++++++++++ 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/packages/vertica-nodejs/README.md b/packages/vertica-nodejs/README.md index 557dee2b..e4283455 100644 --- a/packages/vertica-nodejs/README.md +++ b/packages/vertica-nodejs/README.md @@ -363,7 +363,7 @@ Copy from local stdin in vertica-nodejs can be better described as copy from loc const readableStream = fs.createReadStream(filePath) // assumes filePath is a string containing the path to a data file client.query("CREATE LOCAL TEMP TABLE myTable(x int)", (err) => { if (err) console.log(err) - client.query("COPY myTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => { + client.query({text: "COPY myTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { console.log(err || res.getRejectedRows()) client.end() }) diff --git a/packages/vertica-nodejs/lib/query.js b/packages/vertica-nodejs/lib/query.js index d4ea01ad..6bd9693b 100644 --- a/packages/vertica-nodejs/lib/query.js +++ b/packages/vertica-nodejs/lib/query.js @@ -28,13 +28,13 @@ class Query extends EventEmitter { super() config = utils.normalizeQueryConfig(config, values, callback) - this.text = config.text this.values = config.values this.rows = config.rows this.types = config.types this.name = config.name this.binary = config.binary || false + this.copyStream = config.copyStream || null // use unique portal name each time this.portal = config.portal || '' this.callback = config.callback @@ -50,9 +50,6 @@ class Query extends EventEmitter { this._canceledDueToError = false this._activeError = false this._promise = null - if (this.values) { - this.copyStream = this.values.copyStream || null - } } requiresPreparation() { @@ -70,7 +67,7 @@ class Query extends EventEmitter { return false } // prepare if there are values - if (!this.values || !Array.isArray(this.values)) { + if (!this.values) { return false } return this.values.length > 0 @@ -181,10 +178,10 @@ class Query extends EventEmitter { if (this.text && previous && this.text !== previous) { return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`) } + if (this.values && !Array.isArray(this.values)) { + return new Error('Query values must be an array') + } if (this.requiresPreparation()) { - if (this.values && !Array.isArray(this.values)) { - return new Error('Query values must be an array') - } this.prepare(connection) } else { connection.query(this.text) @@ -201,7 +198,7 @@ class Query extends EventEmitter { } handleEndOfBatchResponse(connection) { - if (this.values && this.values.copyStream) { //copy from stdin + if (this.copyStream) { //copy from stdin connection.sendCopyDone() } // else noop, backend will send CopyDoneResponse for copy from local file to continue the process diff --git a/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js b/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js index 4449a5d4..e97a380a 100644 --- a/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js +++ b/packages/vertica-nodejs/mochatest/integration/client/copy-local-stdin-tests.js @@ -44,7 +44,7 @@ describe('Running Copy From Local Stdin Commands', function () { it ('succeeds with basic copy from stdin command', function(done) { const readableStream = fs.createReadStream(goodFilePath, { encoding: 'utf8' }) readableStream.on('open', () => { - pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { assert.equal(err, undefined) assert.equal(res.rows[0]['Rows Loaded'], 5) done() @@ -55,7 +55,7 @@ describe('Running Copy From Local Stdin Commands', function () { it ('succeeds with a binary input stream', function(done) { const readableStream = fs.createReadStream(goodFilePath) readableStream.on('open', () => { - pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { assert.equal(err, undefined) assert.equal(res.rows[0]['Rows Loaded'], 5) done() @@ -78,7 +78,7 @@ describe('Running Copy From Local Stdin Commands', function () { writableStream.end(() => { const readableStream = fs.createReadStream(largeFilePath, { encoding: 'utf8' }) readableStream.on('open', () => { - pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { try { assert.equal(err, undefined) assert.equal(res.rows[0]['Rows Loaded'], requiredLines) @@ -94,7 +94,7 @@ describe('Running Copy From Local Stdin Commands', function () { it('returns rejected rows with RETURNREJECTED specified', function(done) { const readableStream = fs.createReadStream(badFilePath, { encoding: 'utf8' }) readableStream.on('open', () => { - pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: readableStream}, (err, res) => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: readableStream}, (err, res) => { assert.equal(err, undefined) assert.equal(res.rows[0]['Rows Loaded'], 3) // 3 good rows in badFileContents assert.deepEqual(res.getRejectedRows(), [2, 4]) // rows 2 and 4 are malformed @@ -105,7 +105,7 @@ describe('Running Copy From Local Stdin Commands', function () { it('behaves properly when input stream does not exist/is invalid', function(done) { const badStream = null - pool.query("COPY copyTable FROM LOCAL STDIN RETURNREJECTED", {copyStream: badStream}, (err) => { + pool.query({text: "COPY copyTable FROM LOCAL STDIN RETURNREJECTED", copyStream: badStream}, (err) => { assert.ok(err.message.includes("Cannot perform copy operation. Stream must be an instance of stream.Readable")) done() }) diff --git a/packages/vertica-nodejs/test/integration/client/error-handling-tests.js b/packages/vertica-nodejs/test/integration/client/error-handling-tests.js index 96a47f88..6c75a750 100644 --- a/packages/vertica-nodejs/test/integration/client/error-handling-tests.js +++ b/packages/vertica-nodejs/test/integration/client/error-handling-tests.js @@ -18,6 +18,23 @@ var createErorrClient = function () { const suite = new helper.Suite('error handling') +suite.test('sending non-array argument as values causes an error callback', (done) => { + const client = new Client() + client.connect((err) => { + if (err) { + return done(err) + } + client.query('select ?::varchar as name', 'foo', (err) => { + assert(err.message.includes("Query values must be an array")) + client.query('SELECT ?::varchar as name', ['foo'], (err, res) => { + assert(!err) + assert.equal(res.rows[0].name, 'foo') + client.end(done) + }) + }) + }) +}) + suite.test('re-using connections results in error callback', (done) => { const client = new Client() client.connect((err) => {