Skip to content

Commit

Permalink
refactor: Add arg cpuExecutor to ConnectorFactory::newConnector
Browse files Browse the repository at this point in the history
Add an additional argument, cpuExecutor, to the ConnectorFactory's
newConnector interface. Connectors could send async operators to this
cpuExecutor to prevent occupying the driver executor.
  • Loading branch information
gggrace14 committed Dec 14, 2024
1 parent 42bd38a commit c90f8b4
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class ConnectorFactory {
virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) = 0;
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) = 0;

private:
const std::string name_;
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, ioExecutor);
}
};

Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/tests/ConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> /*config*/,
folly::Executor* /*executor*/ = nullptr) override {
folly::Executor* /*ioExecutor*/ = nullptr,
folly::Executor* /*cpuExecutor*/ = nullptr) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/tpch/TpchConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/AsyncConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* /* executor */) override {
folly::Executor* /* ioExecutor */,
folly::Executor* /* cpuExecutor */) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down

0 comments on commit c90f8b4

Please sign in to comment.