Skip to content

Commit 4fc9f9e

Browse files
authored
apacheGH-47715: [C++][FlightRPC] ODBC scroll fetch implementation (apache#48041)
### Rationale for this change ODBC scroll fetch implementation. This is part of advance ODBC data fetching. ### What changes are included in this PR? - Implement SQLFetchScroll, only fetch orientation `SQL_FETCH_NEXT` is supported, which makes it has same effect as SQLFetch - Tests ### Are these changes tested? Tested in local MSVC. ### Are there any user-facing changes? N/A * GitHub Issue: apache#47715 Authored-by: Alina (Xi) Li <[email protected]> Signed-off-by: James Duong <[email protected]>
1 parent d25f387 commit 4fc9f9e

File tree

2 files changed

+129
-12
lines changed

2 files changed

+129
-12
lines changed

cpp/src/arrow/flight/sql/odbc/odbc_api.cc

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,8 +1075,31 @@ SQLRETURN SQLFetchScroll(SQLHSTMT stmt, SQLSMALLINT fetch_orientation,
10751075
ARROW_LOG(DEBUG) << "SQLFetchScroll called with stmt: " << stmt
10761076
<< ", fetch_orientation: " << fetch_orientation
10771077
<< ", fetch_offset: " << fetch_offset;
1078-
// GH-47715 TODO: Implement SQLFetchScroll
1079-
return SQL_INVALID_HANDLE;
1078+
1079+
using ODBC::ODBCDescriptor;
1080+
using ODBC::ODBCStatement;
1081+
return ODBCStatement::ExecuteWithDiagnostics(stmt, SQL_ERROR, [=]() {
1082+
// Only SQL_FETCH_NEXT forward-only fetching orientation is supported,
1083+
// meaning the behavior of SQLExtendedFetch is same as SQLFetch.
1084+
if (fetch_orientation != SQL_FETCH_NEXT) {
1085+
throw DriverException("Optional feature not supported.", "HYC00");
1086+
}
1087+
// Ignore fetch_offset as it's not applicable to SQL_FETCH_NEXT
1088+
ARROW_UNUSED(fetch_offset);
1089+
1090+
ODBCStatement* statement = reinterpret_cast<ODBCStatement*>(stmt);
1091+
1092+
// The SQL_ATTR_ROW_ARRAY_SIZE statement attribute specifies the number of rows in the
1093+
// rowset.
1094+
ODBCDescriptor* ard = statement->GetARD();
1095+
size_t rows = static_cast<size_t>(ard->GetArraySize());
1096+
if (statement->Fetch(rows)) {
1097+
return SQL_SUCCESS;
1098+
} else {
1099+
// Reached the end of rowset
1100+
return SQL_NO_DATA;
1101+
}
1102+
});
10801103
}
10811104

10821105
SQLRETURN SQLBindCol(SQLHSTMT stmt, SQLUSMALLINT record_number, SQLSMALLINT c_type,

cpp/src/arrow/flight/sql/odbc/tests/statement_test.cc

Lines changed: 104 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ TYPED_TEST(StatementTest, TestSQLExecDirectSimpleQuery) {
4848

4949
SQLINTEGER val;
5050

51-
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
51+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
5252
// Verify 1 is returned
5353
EXPECT_EQ(1, val);
5454

5555
ASSERT_EQ(SQL_NO_DATA, SQLFetch(this->stmt));
5656

57-
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
57+
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
5858
// Invalid cursor state
5959
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState24000);
6060
}
@@ -82,14 +82,14 @@ TYPED_TEST(StatementTest, TestSQLExecuteSimpleQuery) {
8282
ASSERT_EQ(SQL_SUCCESS, SQLFetch(this->stmt));
8383

8484
SQLINTEGER val;
85-
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
85+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
8686

8787
// Verify 1 is returned
8888
EXPECT_EQ(1, val);
8989

9090
ASSERT_EQ(SQL_NO_DATA, SQLFetch(this->stmt));
9191

92-
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
92+
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
9393
// Invalid cursor state
9494
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState24000);
9595
}
@@ -715,6 +715,100 @@ TYPED_TEST(StatementTest, TestSQLExecDirectRowFetching) {
715715
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState24000);
716716
}
717717

718+
TYPED_TEST(StatementTest, TestSQLFetchScrollRowFetching) {
719+
SQLLEN rows_fetched;
720+
SQLSetStmtAttr(this->stmt, SQL_ATTR_ROWS_FETCHED_PTR, &rows_fetched, 0);
721+
722+
std::wstring wsql =
723+
LR"(
724+
SELECT 1 AS small_table
725+
UNION ALL
726+
SELECT 2
727+
UNION ALL
728+
SELECT 3;
729+
)";
730+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
731+
732+
ASSERT_EQ(SQL_SUCCESS,
733+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
734+
735+
// Fetch row 1
736+
ASSERT_EQ(SQL_SUCCESS, SQLFetchScroll(this->stmt, SQL_FETCH_NEXT, 0));
737+
738+
SQLINTEGER val;
739+
SQLLEN buf_len = sizeof(val);
740+
SQLLEN ind;
741+
742+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, buf_len, &ind));
743+
// Verify 1 is returned
744+
EXPECT_EQ(1, val);
745+
// Verify 1 row is fetched
746+
EXPECT_EQ(1, rows_fetched);
747+
748+
// Fetch row 2
749+
ASSERT_EQ(SQL_SUCCESS, SQLFetchScroll(this->stmt, SQL_FETCH_NEXT, 0));
750+
751+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, buf_len, &ind));
752+
753+
// Verify 2 is returned
754+
EXPECT_EQ(2, val);
755+
// Verify 1 row is fetched in the last SQLFetchScroll call
756+
EXPECT_EQ(1, rows_fetched);
757+
758+
// Fetch row 3
759+
ASSERT_EQ(SQL_SUCCESS, SQLFetchScroll(this->stmt, SQL_FETCH_NEXT, 0));
760+
761+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, buf_len, &ind));
762+
763+
// Verify 3 is returned
764+
EXPECT_EQ(3, val);
765+
// Verify 1 row is fetched in the last SQLFetchScroll call
766+
EXPECT_EQ(1, rows_fetched);
767+
768+
// Verify result set has no more data beyond row 3
769+
ASSERT_EQ(SQL_NO_DATA, SQLFetchScroll(this->stmt, SQL_FETCH_NEXT, 0));
770+
771+
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, &ind));
772+
// Invalid cursor state
773+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState24000);
774+
}
775+
776+
TYPED_TEST(StatementTest, TestSQLFetchScrollUnsupportedOrientation) {
777+
// SQL_FETCH_NEXT is the only supported fetch orientation.
778+
779+
std::wstring wsql = L"SELECT 1;";
780+
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
781+
782+
ASSERT_EQ(SQL_SUCCESS,
783+
SQLExecDirect(this->stmt, &sql0[0], static_cast<SQLINTEGER>(sql0.size())));
784+
785+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_PRIOR, 0));
786+
787+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHYC00);
788+
789+
SQLLEN fetch_offset = 1;
790+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_RELATIVE, fetch_offset));
791+
792+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHYC00);
793+
794+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_ABSOLUTE, fetch_offset));
795+
796+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHYC00);
797+
798+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_FIRST, 0));
799+
800+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHYC00);
801+
802+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_LAST, 0));
803+
804+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHYC00);
805+
806+
ASSERT_EQ(SQL_ERROR, SQLFetchScroll(this->stmt, SQL_FETCH_BOOKMARK, fetch_offset));
807+
808+
// DM returns state HY106 for SQL_FETCH_BOOKMARK
809+
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorStateHY106);
810+
}
811+
718812
TYPED_TEST(StatementTest, TestSQLExecDirectVarcharTruncation) {
719813
std::wstring wsql = L"SELECT 'VERY LONG STRING here' AS string_col;";
720814
std::vector<SQLWCHAR> sql0(wsql.begin(), wsql.end());
@@ -881,7 +975,7 @@ TYPED_TEST(StatementTest, DISABLED_TestSQLExecDirectFloatTruncation) {
881975
int16_t ssmall_int_val;
882976

883977
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
884-
SQLGetData(this->stmt, 1, SQL_C_SSHORT, &ssmall_int_val, 0, 0));
978+
SQLGetData(this->stmt, 1, SQL_C_SSHORT, &ssmall_int_val, 0, nullptr));
885979
// Verify float truncation is reported
886980
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState01S07);
887981

@@ -929,7 +1023,7 @@ TEST_F(StatementMockTest, TestSQLExecDirectTruncationQueryNullIndicator) {
9291023
ASSERT_EQ(SQL_SUCCESS, SQLFetch(this->stmt));
9301024

9311025
SQLINTEGER val;
932-
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
1026+
ASSERT_EQ(SQL_SUCCESS, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
9331027
// Verify 1 is returned for non-truncation case.
9341028
EXPECT_EQ(1, val);
9351029

@@ -938,7 +1032,7 @@ TEST_F(StatementMockTest, TestSQLExecDirectTruncationQueryNullIndicator) {
9381032
SQLCHAR char_val[len];
9391033
SQLLEN buf_len = sizeof(SQLCHAR) * len;
9401034
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
941-
SQLGetData(this->stmt, 2, SQL_C_CHAR, &char_val, buf_len, 0));
1035+
SQLGetData(this->stmt, 2, SQL_C_CHAR, &char_val, buf_len, nullptr));
9421036
// Verify string truncation is reported
9431037
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState01004);
9441038

@@ -948,15 +1042,15 @@ TEST_F(StatementMockTest, TestSQLExecDirectTruncationQueryNullIndicator) {
9481042
size_t wchar_size = GetSqlWCharSize();
9491043
buf_len = wchar_size * len2;
9501044
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
951-
SQLGetData(this->stmt, 3, SQL_C_WCHAR, &wchar_val, buf_len, 0));
1045+
SQLGetData(this->stmt, 3, SQL_C_WCHAR, &wchar_val, buf_len, nullptr));
9521046
// Verify string truncation is reported
9531047
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState01004);
9541048

9551049
// varbinary
9561050
std::vector<int8_t> varbinary_val(3);
9571051
buf_len = varbinary_val.size();
9581052
ASSERT_EQ(SQL_SUCCESS_WITH_INFO,
959-
SQLGetData(this->stmt, 4, SQL_C_BINARY, &varbinary_val[0], buf_len, 0));
1053+
SQLGetData(this->stmt, 4, SQL_C_BINARY, &varbinary_val[0], buf_len, nullptr));
9601054
// Verify binary truncation is reported
9611055
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState01004);
9621056
}
@@ -975,7 +1069,7 @@ TEST_F(StatementRemoteTest, TestSQLExecDirectNullQueryNullIndicator) {
9751069

9761070
SQLINTEGER val;
9771071

978-
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, 0));
1072+
ASSERT_EQ(SQL_ERROR, SQLGetData(this->stmt, 1, SQL_C_LONG, &val, 0, nullptr));
9791073
// Verify invalid null indicator is reported, as it is required
9801074
VerifyOdbcErrorState(SQL_HANDLE_STMT, this->stmt, kErrorState22002);
9811075
}

0 commit comments

Comments
 (0)