diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml index 23515edc761bf..f4e124f9e63f8 100644 --- a/.idea/inspectionProfiles/Project_Default.xml +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -1,776 +1,764 @@ - - + \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeNodeNameTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeNodeNameTask.java new file mode 100644 index 0000000000000..fe885b3d9b038 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeNodeNameTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.platform; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Test task returning Node name. + */ +public class PlatformComputeNodeNameTask extends ComputeTaskAdapter { + /** {@inheritDoc} */ + @NotNull @Override public Map map(List subgrid, + @Nullable Object arg) { + return Collections.singletonMap(new Job(), F.rand(subgrid)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object reduce(List results) { + return results.get(0).getData(); + } + + /** + * Job. + */ + private static class Job extends ComputeJobAdapter { + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + return ignite.name(); + } + } +} diff --git a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h index 22955edb1cf63..d5c3d6001e30c 100644 --- a/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h +++ b/modules/platforms/cpp/binary/include/ignite/impl/interop/interop_memory.h @@ -48,7 +48,25 @@ namespace ignite /** Flag: acquired. */ const int IGNITE_MEM_FLAG_ACQUIRED = 0x4; - + + /** + * A helper union to bitwise conversion from int32_t to float and back. + */ + union BinaryFloatInt32 + { + float f; + int32_t i; + }; + + /** + * A helper union to bitwise conversion from int64_t to double and back. + */ + union BinaryDoubleInt64 + { + double d; + int64_t i; + }; + /** * Interop memory. */ diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp index 9a43ec153c4a5..907c840343d91 100644 --- a/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp +++ b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp @@ -44,18 +44,6 @@ namespace ignite { namespace interop { - union BinaryInt32Float - { - int32_t i; - float f; - }; - - union BinaryInt64Double - { - int64_t i; - double d; - }; - InteropInputStream::InteropInputStream(InteropMemory* mem) { this->mem = mem; @@ -158,7 +146,7 @@ namespace ignite float InteropInputStream::ReadFloat() { - BinaryInt32Float u; + BinaryFloatInt32 u; u.i = ReadInt32(); @@ -172,7 +160,7 @@ namespace ignite double InteropInputStream::ReadDouble() { - BinaryInt64Double u; + BinaryDoubleInt64 u; u.i = ReadInt64(); diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp index 3e8160d8ecd4b..bbf28f6fc9ee5 100644 --- a/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp +++ b/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp @@ -43,18 +43,6 @@ namespace ignite { namespace interop { - union BinaryFloatInt32 - { - float f; - int32_t i; - }; - - union BinaryDoubleInt64 - { - double d; - int64_t i; - }; - InteropOutputStream::InteropOutputStream(InteropMemory* mem) { this->mem = mem; diff --git a/modules/platforms/cpp/core-test/CMakeLists.txt b/modules/platforms/cpp/core-test/CMakeLists.txt index d308aa10d92b2..339da20188ff1 100644 --- a/modules/platforms/cpp/core-test/CMakeLists.txt +++ b/modules/platforms/cpp/core-test/CMakeLists.txt @@ -37,6 +37,7 @@ set(SOURCES src/reference_test.cpp src/continuous_query_test.cpp src/concurrent_test.cpp src/compute_test.cpp + src/compute_java_test.cpp src/ignition_test.cpp src/interop_memory_test.cpp src/interop_test.cpp diff --git a/modules/platforms/cpp/core-test/include/ignite/complex_type.h b/modules/platforms/cpp/core-test/include/ignite/complex_type.h index 241c88bbd1b4b..bc8f1e8213a2f 100644 --- a/modules/platforms/cpp/core-test/include/ignite/complex_type.h +++ b/modules/platforms/cpp/core-test/include/ignite/complex_type.h @@ -15,8 +15,8 @@ * limitations under the License. */ -#ifndef _IGNITE_ODBC_TEST_COMPLEX_TYPE -#define _IGNITE_ODBC_TEST_COMPLEX_TYPE +#ifndef _IGNITE_CORE_TEST_COMPLEX_TYPE +#define _IGNITE_CORE_TEST_COMPLEX_TYPE #include #include @@ -120,4 +120,4 @@ namespace ignite } } -#endif // _IGNITE_ODBC_TEST_COMPLEX_TYPE +#endif // _IGNITE_CORE_TEST_COMPLEX_TYPE diff --git a/modules/platforms/cpp/core-test/include/ignite/compute_types.h b/modules/platforms/cpp/core-test/include/ignite/compute_types.h new file mode 100644 index 0000000000000..8b62842125f63 --- /dev/null +++ b/modules/platforms/cpp/core-test/include/ignite/compute_types.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_CORE_TEST_COMPUTE_TYPES +#define _IGNITE_CORE_TEST_COMPUTE_TYPES + +#include +#include + +#include +#include + +namespace ignite_test +{ + /* + * Check if cluster node contains an attribute with name provided. + */ + class HasAttrName : public ignite::IgnitePredicate + { + public: + HasAttrName(std::string name) : + name(name) + { + // No-op. + } + + bool operator()(ignite::cluster::ClusterNode& node) + { + std::vector attrs = node.GetAttributes(); + + return std::find(attrs.begin(), attrs.end(), name) != attrs.end(); + } + + private: + std::string name; + }; + + /* + * Check if cluster node contains an attribute with value provided. + */ + class HasAttrValue : public ignite::IgnitePredicate + { + public: + HasAttrValue(std::string name, std::string val) : + name(name), + val(val) + { + // No-op. + } + + bool operator()(ignite::cluster::ClusterNode& node) + { + try { + return node.GetAttribute(name) == this->val; + } + catch (...) {} + + return false; + } + + private: + std::string name; + std::string val; + }; +} + +#endif // _IGNITE_CORE_TEST_COMPUTE_TYPES diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj index fc81a15e667a4..7f6f5ee8e05a0 100644 --- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj +++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj @@ -95,6 +95,7 @@ + diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters index 79abb4e891eef..e73d2c0296bbf 100644 --- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters +++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters @@ -88,6 +88,12 @@ Code + + Code + + + Code + Code @@ -111,6 +117,9 @@ Code\Types + + Code\Types + Code\Types diff --git a/modules/platforms/cpp/core-test/src/cluster_group_test.cpp b/modules/platforms/cpp/core-test/src/cluster_group_test.cpp index 1b2545b3505a9..242c63fa00c01 100644 --- a/modules/platforms/cpp/core-test/src/cluster_group_test.cpp +++ b/modules/platforms/cpp/core-test/src/cluster_group_test.cpp @@ -19,6 +19,7 @@ #include #include +#include using namespace ignite; using namespace ignite::common; @@ -27,56 +28,6 @@ using namespace ignite::cluster; using namespace boost::unit_test; -/* - * Check if cluster node contain the attribute with name provided. - */ -class HasAttrName : public IgnitePredicate -{ -public: - HasAttrName(std::string name) : - name(name) - { - // No-op. - } - - bool operator()(ClusterNode& node) - { - std::vector attrs = node.GetAttributes(); - - return std::find(attrs.begin(), attrs.end(), name) != attrs.end(); - } - -private: - std::string name; -}; - -/* - * Check if cluster node contain the attribute with value provided. - */ -class HasAttrValue : public IgnitePredicate -{ -public: - HasAttrValue(std::string name, std::string val) : - name(name), val(val) - { - // No-op. - } - - bool operator()(ClusterNode& node) - { - try { - return node.GetAttribute(name) == this->val; - } - catch (...) {} - - return false; - } - -private: - std::string name; - std::string val; -}; - /* * Predicate holder is required to demonstrate * how to pass IgnitePredicate pointer to the stl container. @@ -397,8 +348,8 @@ BOOST_AUTO_TEST_CASE(IgniteForPredicate) ClusterGroup groupServers = group0.ForServers(); ClusterGroup groupClients = group0.ForClients(); - ClusterGroup group1 = groupServers.ForPredicate(new HasAttrValue("TestAttribute", "Value0")); - ClusterGroup group2 = groupServers.ForPredicate(new HasAttrValue("TestAttribute", "Value1")); + ClusterGroup group1 = groupServers.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "Value0")); + ClusterGroup group2 = groupServers.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "Value1")); ClusterGroup group3 = groupServers.ForClients(); BOOST_REQUIRE(group0.GetNodes().size() == 4); @@ -408,10 +359,10 @@ BOOST_AUTO_TEST_CASE(IgniteForPredicate) BOOST_REQUIRE(group2.GetNodes().size() == 2); BOOST_REQUIRE(group3.GetNodes().size() == 0); - ClusterGroup group4 = group0.ForPredicate(new HasAttrName("TestAttribute")); - ClusterGroup group5 = group4.ForPredicate(new HasAttrValue("TestAttribute", "Value0")); - ClusterGroup group6 = group4.ForPredicate(new HasAttrValue("TestAttribute", "Value1")); - ClusterGroup group7 = group4.ForPredicate(new HasAttrValue("TestAttribute", "ValueInvalid")); + ClusterGroup group4 = group0.ForPredicate(new ignite_test::HasAttrName("TestAttribute")); + ClusterGroup group5 = group4.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "Value0")); + ClusterGroup group6 = group4.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "Value1")); + ClusterGroup group7 = group4.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "ValueInvalid")); BOOST_REQUIRE(group4.GetNodes().size() == 3); BOOST_REQUIRE(group5.GetNodes().size() == 1); @@ -534,7 +485,7 @@ BOOST_AUTO_TEST_CASE(IgniteGetPredicate) { IgniteCluster cluster = server1.GetCluster(); ClusterGroup group0 = cluster.AsClusterGroup(); - ClusterGroup group1 = group0.ForPredicate(new HasAttrValue("TestAttribute", "Value1")); + ClusterGroup group1 = group0.ForPredicate(new ignite_test::HasAttrValue("TestAttribute", "Value1")); std::vector nodes0 = group0.GetNodes(); std::vector nodes1 = group1.GetNodes(); diff --git a/modules/platforms/cpp/core-test/src/compute_java_test.cpp b/modules/platforms/cpp/core-test/src/compute_java_test.cpp new file mode 100644 index 0000000000000..61e64f581baf3 --- /dev/null +++ b/modules/platforms/cpp/core-test/src/compute_java_test.cpp @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include + +using namespace ignite; +using namespace ignite::cache; +using namespace ignite::cluster; +using namespace ignite::compute; +using namespace ignite::common::concurrent; +using namespace ignite::impl; +using namespace ignite_test; + +using namespace boost::unit_test; + +namespace +{ + /** Echo task name. */ + const std::string ECHO_TASK("org.apache.ignite.platform.PlatformComputeEchoTask"); + + /** Node name task name. */ + const std::string NODE_NAME_TASK("org.apache.ignite.platform.PlatformComputeNodeNameTask"); + + /** Echo type: null. */ + const int32_t ECHO_TYPE_NULL = 0; + + /** Echo type: byte. */ + const int32_t ECHO_TYPE_BYTE = 1; + + /** Echo type: bool. */ + const int32_t ECHO_TYPE_BOOL = 2; + + /** Echo type: short. */ + const int32_t ECHO_TYPE_SHORT = 3; + + /** Echo type: char. */ + const int32_t ECHO_TYPE_CHAR = 4; + + /** Echo type: int. */ + const int32_t ECHO_TYPE_INT = 5; + + /** Echo type: long. */ + const int32_t ECHO_TYPE_LONG = 6; + + /** Echo type: float. */ + const int32_t ECHO_TYPE_FLOAT = 7; + + /** Echo type: double. */ + const int32_t ECHO_TYPE_DOUBLE = 8; + + /** Echo type: object. */ + const int32_t ECHO_TYPE_OBJECT = 12; + + /** Echo type: uuid. */ + const int32_t ECHO_TYPE_UUID = 22; +} + +/* + * Test setup fixture. + */ +struct ComputeJavaTestSuiteFixture +{ + Ignite node; + + static Ignite MakeNode(int idx) + { + std::stringstream ss_config; + + ss_config << "compute-server" << idx; +#ifdef IGNITE_TESTS_32 + ss_config << "-32"; +#endif + ss_config << ".xml"; + + std::stringstream ss_name; + + ss_name << "ComputeNode" << idx; + + std::string name = ss_name.str(); + std::string config = ss_config.str(); + + return StartNode(config.c_str(), name.c_str()); + } + + /* + * Constructor. + */ + ComputeJavaTestSuiteFixture() : + node(MakeNode(0)) + { + // No-op. + } + + /* + * Destructor. + */ + ~ComputeJavaTestSuiteFixture() + { + Ignition::StopAll(true); + } + + /** + * Get default cache. + * + * @return Default cache. + */ + template + Cache GetDefaultCache() + { + return node.GetOrCreateCache("default"); + } +}; + +/** + * Binarizable object for task tests. + */ +class PlatformComputeBinarizable +{ +public: + /** + * Constructor. + */ + PlatformComputeBinarizable() + { + // No-op. + } + + /** + * Constructor, + * + * @param field Field. + */ + PlatformComputeBinarizable(int32_t field) : + field(field) + { + // No-op. + } + + /** Field. */ + int32_t field; +}; + + +namespace ignite +{ + namespace binary + { + template<> + struct BinaryType : BinaryTypeDefaultAll + { + static void GetTypeName(std::string& dst) + { + dst = "PlatformComputeBinarizable"; + } + + static void Write(BinaryWriter& writer, const PlatformComputeBinarizable& obj) + { + writer.WriteInt32("field", obj.field); + } + + static void Read(BinaryReader& reader, PlatformComputeBinarizable& dst) + { + dst.field = reader.ReadInt32("field"); + } + }; + } +} + +BOOST_FIXTURE_TEST_SUITE(ComputeJavaTestSuite, ComputeJavaTestSuiteFixture) + +BOOST_AUTO_TEST_CASE(EchoTaskNull) +{ + Compute compute = node.GetCompute(); + + int* res = compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_NULL); + + BOOST_CHECK(res == 0); +} + +BOOST_AUTO_TEST_CASE(EchoTaskNullAsync) +{ + Compute compute = node.GetCompute(); + + int* res = compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_NULL).GetValue(); + + BOOST_CHECK(res == 0); +} + +BOOST_AUTO_TEST_CASE(EchoTaskPrimitives) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_BYTE)); + BOOST_CHECK_EQUAL(true, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_BOOL)); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_SHORT)); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_CHAR)); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_INT)); + BOOST_CHECK_EQUAL(1LL, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_LONG)); + BOOST_CHECK_EQUAL(1.0f, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_FLOAT)); + BOOST_CHECK_EQUAL(1.0, compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_DOUBLE)); +} + +BOOST_AUTO_TEST_CASE(EchoTaskPrimitivesAsync) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_BYTE).GetValue()); + BOOST_CHECK_EQUAL(true, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_BOOL).GetValue()); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_SHORT).GetValue()); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_CHAR).GetValue()); + BOOST_CHECK_EQUAL(1, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_INT).GetValue()); + BOOST_CHECK_EQUAL(1LL, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_LONG).GetValue()); + BOOST_CHECK_EQUAL(1.0f, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_FLOAT).GetValue()); + BOOST_CHECK_EQUAL(1.0, compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_DOUBLE).GetValue()); +} + +BOOST_AUTO_TEST_CASE(EchoTaskObject) +{ + Compute compute = node.GetCompute(); + Cache cache = GetDefaultCache(); + + for (int32_t i = 0; i < 100; ++i) + { + int32_t value = i * 42; + cache.Put(ECHO_TYPE_OBJECT, value); + + PlatformComputeBinarizable res = + compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_OBJECT); + + BOOST_CHECK_EQUAL(value, res.field); + } +} + +BOOST_AUTO_TEST_CASE(EchoTaskObjectAsync) +{ + Compute compute = node.GetCompute(); + Cache cache = GetDefaultCache(); + + for (int32_t i = 0; i < 100; ++i) + { + int32_t value = i * 42; + cache.Put(ECHO_TYPE_OBJECT, value); + + PlatformComputeBinarizable res = + compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_OBJECT).GetValue(); + + BOOST_CHECK_EQUAL(value, res.field); + } +} + +BOOST_AUTO_TEST_CASE(EchoTaskGuid) +{ + Compute compute = node.GetCompute(); + Cache cache = GetDefaultCache(); + + for (int32_t i = 0; i < 100; ++i) + { + Guid value(i * 479001599LL, i * 150209LL); + + cache.Put(ECHO_TYPE_UUID, value); + + ignite::Guid res = compute.ExecuteJavaTask(ECHO_TASK, ECHO_TYPE_UUID); + + BOOST_CHECK_EQUAL(value, res); + } +} + +BOOST_AUTO_TEST_CASE(EchoTaskGuidAsync) +{ + Compute compute = node.GetCompute(); + Cache cache = GetDefaultCache(); + + for (int32_t i = 0; i < 100; ++i) + { + Guid value(i * 479001599LL, i * 150209LL); + + cache.Put(ECHO_TYPE_UUID, value); + + ignite::Guid res = compute.ExecuteJavaTaskAsync(ECHO_TASK, ECHO_TYPE_UUID).GetValue(); + + BOOST_CHECK_EQUAL(value, res); + } +} + +BOOST_AUTO_TEST_CASE(ClusterBasic) +{ + Ignite node2 = MakeNode(1); + + Compute compute = node.GetCompute(node.GetCluster().ForLocal()); + + for (int32_t i = 0; i < 100; ++i) + { + std::string res = compute.ExecuteJavaTask(NODE_NAME_TASK); + + BOOST_CHECK_EQUAL(std::string(node.GetName()), res); + } +} + +BOOST_AUTO_TEST_CASE(ClusterBasicAsync) +{ + Ignite node2 = MakeNode(1); + + Compute compute = node.GetCompute(node.GetCluster().ForLocal()); + + for (int32_t i = 0; i < 100; ++i) + { + std::string res = compute.ExecuteJavaTaskAsync(NODE_NAME_TASK).GetValue(); + + BOOST_CHECK_EQUAL(std::string(node.GetName()), res); + } +} + +BOOST_AUTO_TEST_CASE(ClusterPredicate) +{ + Ignite node2 = MakeNode(1); + + ClusterGroup grp = node.GetCluster().AsClusterGroup().ForPredicate( + new ignite_test::HasAttrValue("TestAttribute", "Value0")); + + Compute compute = node.GetCompute(grp); + + for (int32_t i = 0; i < 100; ++i) + { + std::string res = compute.ExecuteJavaTask(NODE_NAME_TASK); + + BOOST_CHECK_EQUAL(std::string(node.GetName()), res); + } +} + +BOOST_AUTO_TEST_CASE(ClusterPredicateAsync) +{ + Ignite node2 = MakeNode(1); + + ClusterGroup grp = node.GetCluster().AsClusterGroup().ForPredicate( + new ignite_test::HasAttrValue("TestAttribute", "Value0")); + + Compute compute = node.GetCompute(grp); + + for (int32_t i = 0; i < 100; ++i) + { + std::string res = compute.ExecuteJavaTaskAsync(NODE_NAME_TASK).GetValue(); + + BOOST_CHECK_EQUAL(std::string(node.GetName()), res); + } +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/modules/platforms/cpp/core/include/ignite/cluster/cluster_group.h b/modules/platforms/cpp/core/include/ignite/cluster/cluster_group.h index bfb0ad605e928..1960c7741c8f0 100644 --- a/modules/platforms/cpp/core/include/ignite/cluster/cluster_group.h +++ b/modules/platforms/cpp/core/include/ignite/cluster/cluster_group.h @@ -29,6 +29,11 @@ namespace ignite { + namespace impl + { + class IgniteImpl; + } + namespace cluster { /** @@ -40,6 +45,7 @@ namespace ignite class IGNITE_IMPORT_EXPORT ClusterGroup { friend class impl::cluster::ClusterGroupImpl; + friend class impl::IgniteImpl; public: /** * Constructor. diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h index ac0b32acdec11..a9767cd38df40 100644 --- a/modules/platforms/cpp/core/include/ignite/compute/compute.h +++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h @@ -306,6 +306,70 @@ namespace ignite return impl.Get()->BroadcastAsync(func); } + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + R ExecuteJavaTask(const std::string& taskName, const A& taskArg) + { + return impl.Get()->ExecuteJavaTask(taskName, taskArg); + } + + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + */ + template + R ExecuteJavaTask(const std::string& taskName) + { + return impl.Get()->ExecuteJavaTask(taskName); + } + + /** + * Asynchronously executes given Java task on the grid projection. If task for given name has not been + * deployed yet, then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @return Future containing a result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + Future ExecuteJavaTaskAsync(const std::string& taskName, const A& taskArg) + { + return impl.Get()->ExecuteJavaTaskAsync(taskName, taskArg); + } + + /** + * Asynchronously executes given Java task on the grid projection. If task for given name has not been + * deployed yet, then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @return Future containing a result of type @c R. + * + * @tparam R Type of task result. + */ + template + Future ExecuteJavaTaskAsync(const std::string& taskName) + { + return impl.Get()->ExecuteJavaTaskAsync(taskName); + } + private: /** Implementation. */ common::concurrent::SharedPointer impl; diff --git a/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h index a71ce3cfb0b84..7ffc9cba028ee 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cluster/cluster_group_impl.h @@ -24,7 +24,6 @@ #include #include -#include #include namespace ignite @@ -53,7 +52,6 @@ namespace ignite class IGNITE_FRIEND_EXPORT ClusterGroupImpl : private interop::InteropTarget { typedef common::concurrent::SharedPointer SP_IgniteEnvironment; - typedef common::concurrent::SharedPointer SP_ComputeImpl; typedef common::concurrent::SharedPointer > SP_ClusterNodes; typedef common::concurrent::SharedPointer SP_PredicateHolder; public: @@ -265,20 +263,6 @@ namespace ignite */ std::vector GetNodes(); - /** - * Get compute instance over this cluster group. - * - * @return Pointer to compute instance. - */ - SP_ComputeImpl GetCompute(); - - /** - * Get compute instance over specified cluster group. - * - * @return Pointer to compute instance. - */ - SP_ComputeImpl GetCompute(ignite::cluster::ClusterGroup grp); - /** * Check if the Ignite grid is active. * @@ -347,6 +331,13 @@ namespace ignite */ IgnitePredicate* GetPredicate(); + /** + * Get predicate that defines a subset of nodes for this cluster group. + * + * @return Pointer to predicate. + */ + const IgnitePredicate* GetPredicate() const; + /** * Get a topology by version. * @@ -362,6 +353,13 @@ namespace ignite */ int64_t GetTopologyVersion(); + /** + * Get a compute processor for the cluster group. + * + * @return Compute processor instance reference. + */ + jobject GetComputeProcessor(); + private: IGNITE_NO_COPY_ASSIGNMENT(ClusterGroupImpl); @@ -390,13 +388,6 @@ namespace ignite */ SP_ClusterGroupImpl FromTarget(jobject javaRef); - /** - * Get instance of compute internally. - * - * @return Pointer to compute. - */ - SP_ComputeImpl InternalGetCompute(); - /** * Read cluster nodes from stream. * @@ -414,12 +405,12 @@ namespace ignite /** * Get container of refreshed cluster nodes over this cluster group. * - * @return Instance of compute. + * @return Cluster nodes. */ std::vector RefreshNodes(); - /** Compute for the cluster group. */ - SP_ComputeImpl computeImpl; + /** Cluster node predicate. */ + SP_PredicateHolder predHolder; /** Cluster nodes. */ SP_ClusterNodes nodes; @@ -429,9 +420,6 @@ namespace ignite /** Cluster nodes top version. */ int64_t topVer; - - /** Cluster node predicate. */ - SP_PredicateHolder predHolder; }; } } diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h index 6f95743f36a47..bd07ea4d7437d 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h @@ -25,7 +25,12 @@ #include #include + +#include + #include +#include +#include #include #include #include @@ -51,6 +56,10 @@ namespace ignite { BROADCAST = 2, + EXEC = 3, + + EXEC_ASYNC = 4, + UNICAST = 5, AFFINITY_CALL = 13, @@ -63,9 +72,10 @@ namespace ignite * Constructor. * * @param env Environment. - * @param javaRef Java object reference. + * @param clusterGroup Cluster group for the compute. */ - ComputeImpl(common::concurrent::SharedPointer env, jobject javaRef); + ComputeImpl(common::concurrent::SharedPointer env, + cluster::SP_ClusterGroupImpl clusterGroup); /** * Executes given job asynchronously on the node where data for @@ -117,7 +127,7 @@ namespace ignite } /** - * Asyncronuously calls provided ComputeFunc on a node within + * Asynchronously calls provided ComputeFunc on a node within * the underlying cluster group. * * @tparam F Compute function type. Should implement @@ -139,7 +149,7 @@ namespace ignite } /** - * Asyncronuously runs provided ComputeFunc on a node within + * Asynchronously runs provided ComputeFunc on a node within * the underlying cluster group. * * @tparam F Compute action type. Should implement @@ -158,7 +168,7 @@ namespace ignite } /** - * Asyncronuously broadcasts provided ComputeFunc to all nodes + * Asynchronously broadcasts provided ComputeFunc to all nodes * in the underlying cluster group. * * @tparam F Compute function type. Should implement @@ -180,7 +190,7 @@ namespace ignite } /** - * Asyncronuously broadcasts provided ComputeFunc to all nodes + * Asynchronously broadcasts provided ComputeFunc to all nodes * in the underlying cluster group. * * @tparam F Compute function type. Should implement @@ -198,7 +208,215 @@ namespace ignite return PerformTask(Operation::BROADCAST, func); } + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + R ExecuteJavaTask(const std::string& taskName, const A& taskArg) + { + return PerformJavaTask(taskName, &taskArg); + } + + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + */ + template + R ExecuteJavaTask(const std::string& taskName) + { + return PerformJavaTask(taskName, 0); + } + + /** + * Asynchronously executes given Java task on the grid projection. If task for given name has not been + * deployed yet, then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @return Future containing a result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + Future ExecuteJavaTaskAsync(const std::string& taskName, const A& taskArg) + { + return PerformJavaTaskAsync(taskName, &taskArg); + } + + /** + * Asynchronously executes given Java task on the grid projection. If task for given name has not been + * deployed yet, then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @return Future containing a result of type @c R. + * + * @tparam R Type of task result. + */ + template + Future ExecuteJavaTaskAsync(const std::string& taskName) + { + return PerformJavaTaskAsync(taskName, 0); + } + private: + IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl); + + struct FutureType + { + enum Type + { + F_BYTE = 1, + F_BOOL = 2, + F_SHORT = 3, + F_CHAR = 4, + F_INT = 5, + F_FLOAT = 6, + F_LONG = 7, + F_DOUBLE = 8, + F_OBJECT = 9, + }; + }; + + template struct FutureTypeForType { static const int32_t value = FutureType::F_OBJECT; }; + + /** + * @return True if projection for the compute contains predicate. + */ + bool ProjectionContainsPredicate() const; + + /** + * @return Nodes for the compute. + */ + std::vector GetNodes(); + + /** + * Write Java task using provided writer. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @param writer Binary writer. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + void WriteJavaTask(const std::string& taskName, const A* arg, binary::BinaryWriterImpl& writer) { + writer.WriteString(taskName); + + // Keep binary flag + writer.WriteBool(false); + if (arg) + writer.WriteObject(*arg); + else + writer.WriteNull(); + + if (!ProjectionContainsPredicate()) + writer.WriteBool(false); + else + { + typedef std::vector ClusterNodes; + ClusterNodes nodes = GetNodes(); + + writer.WriteBool(true); + writer.WriteInt32(static_cast(nodes.size())); + for (ClusterNodes::iterator it = nodes.begin(); it != nodes.end(); ++it) + writer.WriteGuid(it->GetId()); + } + } + + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param taskArg Argument of task execution of type A. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + R PerformJavaTask(const std::string& taskName, const A* arg) + { + using namespace common::concurrent; + + SharedPointer memIn = GetEnvironment().AllocateMemory(); + interop::InteropOutputStream out(memIn.Get()); + binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + + WriteJavaTask(taskName, arg, writer); + + out.Synchronize(); + + SharedPointer memOut = GetEnvironment().AllocateMemory(); + + IgniteError err; + InStreamOutStream(Operation::EXEC, *memIn.Get(), *memOut.Get(), err); + IgniteError::ThrowIfNeeded(err); + + interop::InteropInputStream inStream(memOut.Get()); + binary::BinaryReaderImpl reader(&inStream); + + return reader.ReadObject(); + } + + /** + * Executes given Java task on the grid projection. If task for given name has not been deployed yet, + * then 'taskName' will be used as task class name to auto-deploy the task. + * + * @param taskName Java task name. + * @param arg Argument of task execution of type A. + * @return Task result of type @c R. + * + * @tparam R Type of task result. + * @tparam A Type of task argument. + */ + template + Future PerformJavaTaskAsync(const std::string& taskName, const A* arg) + { + typedef JavaComputeTaskHolder TaskHolder; + common::concurrent::SharedPointer task(new TaskHolder()); + int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task); + + common::concurrent::SharedPointer mem = GetEnvironment().AllocateMemory(); + interop::InteropOutputStream out(mem.Get()); + binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + + WriteJavaTask(taskName, arg, writer); + + writer.WriteInt64(taskHandle); + writer.WriteInt32(FutureTypeForType::value); + + out.Synchronize(); + + IgniteError err; + jobject target = InStreamOutObject(Operation::EXEC_ASYNC, *mem.Get(), err); + IgniteError::ThrowIfNeeded(err); + + std::auto_ptr cancelable(new CancelableImpl(GetEnvironmentPointer(), target)); + + common::Promise& promise = task.Get()->GetPromise(); + promise.SetCancelTarget(cancelable); + + return promise.GetFuture(); + } + /** * Perform job. * @@ -328,7 +546,40 @@ namespace ignite return promise.GetFuture(); } - IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl); + /** Cluster group */ + cluster::SP_ClusterGroupImpl clusterGroup; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_BYTE; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_BOOL; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_SHORT; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_CHAR; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_INT; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_LONG; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_FLOAT; + }; + + template<> struct IGNITE_IMPORT_EXPORT ComputeImpl::FutureTypeForType { + static const int32_t value = FutureType::F_DOUBLE; }; } } diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h index 8438c1d21c9c2..5eb3f70ffca87 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h @@ -25,6 +25,7 @@ #include +#include #include namespace ignite @@ -83,13 +84,24 @@ namespace ignite virtual void JobResultError(const IgniteError& err) = 0; /** - * Process successfull result. + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t value) = 0; + + /** + * Process successful result. * * @param reader Reader for stream with result. - * @param err Error. */ virtual void JobResultSuccess(binary::BinaryReaderImpl& reader) = 0; + /** + * Process successful result. + */ + virtual void JobNullResultSuccess() = 0; + /** * Reduce results of related jobs. */ @@ -109,6 +121,65 @@ namespace ignite /** Related job handle. */ int64_t handle; }; + + + /** + * Read future result. + * @tparam T Type of the result. + * @param value Value. + * @return Result. + */ + template T PrimitiveFutureResult(int64_t value) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, + "Primitive value passed to non-primitive future", "value", value); + } + + template<> inline int8_t PrimitiveFutureResult(int64_t value) + { + return static_cast(value); + } + + template<> inline int16_t PrimitiveFutureResult(int64_t value) + { + return static_cast(value); + } + + template<> inline int32_t PrimitiveFutureResult(int64_t value) + { + return static_cast(value); + } + + template<> inline int64_t PrimitiveFutureResult(int64_t value) + { + return static_cast(value); + } + + template<> inline bool PrimitiveFutureResult(int64_t value) + { + return value != 0; + } + + template<> inline uint16_t PrimitiveFutureResult(int64_t value) + { + return static_cast(value); + } + + template<> inline float PrimitiveFutureResult(int64_t value) + { + impl::interop::BinaryFloatInt32 u; + + u.i = static_cast(value); + return u.f; + } + + template<> inline double PrimitiveFutureResult(int64_t value) + { + impl::interop::BinaryDoubleInt64 u; + + u.i = value; + return u.d; + } } } } diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/java_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/java_compute_task_holder.h new file mode 100644 index 0000000000000..ccfd5d87c96b0 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/java_compute_task_holder.h @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::JavaComputeTaskHolder class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_JAVA_COMPUTE_TASK_HOLDER +#define _IGNITE_IMPL_COMPUTE_JAVA_COMPUTE_TASK_HOLDER + +#include + +#include +#include +#include + +namespace ignite +{ + namespace impl + { + namespace compute + { + /** + * Compute task holder type-specific implementation. + */ + template + class JavaComputeTaskHolder : public ComputeTaskHolder + { + public: + typedef R ResultType; + + /** + * Constructor. + */ + JavaComputeTaskHolder() : + ComputeTaskHolder(-1) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~JavaComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder&) + { + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(binary::BinaryReaderImpl&) + { + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param reader Reader for stream with result. + */ + virtual void JobResultError(const IgniteError& err) + { + res.SetError(err); + } + + /** + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t value) + { + res.SetResult(PrimitiveFutureResult(value)); + } + + /** + * Process successful result. + * + * @param reader Reader for stream with result. + */ + virtual void JobResultSuccess(binary::BinaryReaderImpl& reader) + { + res.SetResult(reader.ReadObject()); + } + + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + res.SetResult(impl::binary::BinaryUtils::GetDefaultValue()); + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult res; + + /** Task result promise. */ + common::Promise promise; + }; + + /** + * Compute task holder type-specific implementation. + */ + template<> + class JavaComputeTaskHolder : public ComputeTaskHolder + { + public: + /** + * Constructor. + */ + JavaComputeTaskHolder() : + ComputeTaskHolder(-1) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~JavaComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder&) + { + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(binary::BinaryReaderImpl&) + { + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param reader Reader for stream with result. + */ + virtual void JobResultError(const IgniteError& err) + { + res.SetError(err); + } + + /** + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t) + { + res.SetResult(); + } + + /** + * Process successful result. + * + * @param reader Reader for stream with result. + */ + virtual void JobResultSuccess(binary::BinaryReaderImpl&) + { + res.SetResult(); + } + + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + res.SetResult(); + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult res; + + /** Task result promise. */ + common::Promise promise; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_JAVA_COMPUTE_TASK_HOLDER diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h index cab8632d064ed..7471030ef2ef0 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h @@ -110,7 +110,6 @@ namespace ignite * Process remote job result. * * @param reader Reader for stream with result. - * @return Policy. */ virtual void JobResultError(const IgniteError& err) { @@ -122,10 +121,23 @@ namespace ignite } /** - * Process successfull result. + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t value) + { + ComputeJobResult res; + + res.SetResult(PrimitiveFutureResult(value)); + + ProcessResult(res); + } + + /** + * Process successful result. * * @param reader Reader for stream with result. - * @param err Error. */ virtual void JobResultSuccess(binary::BinaryReaderImpl& reader) { @@ -136,6 +148,18 @@ namespace ignite ProcessResult(res); } + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + ComputeJobResult res; + + res.SetResult(impl::binary::BinaryUtils::GetDefaultValue()); + + ProcessResult(res); + } + /** * Reduce results of related jobs. */ @@ -249,7 +273,6 @@ namespace ignite * Process remote job result. * * @param reader Reader for stream with result. - * @return Policy. */ virtual void JobResultError(const IgniteError& err) { @@ -261,10 +284,23 @@ namespace ignite } /** - * Process successfull result. + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t) + { + ComputeJobResult res; + + res.SetResult(); + + ProcessResult(res); + } + + /** + * Process successful result. * * @param reader Reader for stream with result. - * @param err Error. */ virtual void JobResultSuccess(binary::BinaryReaderImpl&) { @@ -275,6 +311,18 @@ namespace ignite ProcessResult(res); } + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + ComputeJobResult res; + + res.SetResult(); + + ProcessResult(res); + } + /** * Reduce results of related jobs. */ diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h index 8f289295ade38..6b6b2ba47bf49 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h @@ -98,7 +98,6 @@ namespace ignite * Process remote job result. * * @param reader Reader for stream with result. - * @return Policy. */ virtual void JobResultError(const IgniteError& err) { @@ -106,16 +105,33 @@ namespace ignite } /** - * Process successfull result. + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t value) + { + res.SetResult(PrimitiveFutureResult(value)); + } + + /** + * Process successful result. * * @param reader Reader for stream with result. - * @param err Error. */ virtual void JobResultSuccess(binary::BinaryReaderImpl& reader) { res.SetResult(reader.ReadObject()); } + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + res.SetResult(impl::binary::BinaryUtils::GetDefaultValue()); + } + /** * Reduce results of related jobs. */ @@ -204,7 +220,6 @@ namespace ignite * Process remote job result. * * @param reader Reader for stream with result. - * @return Policy. */ virtual void JobResultError(const IgniteError& err) { @@ -212,16 +227,33 @@ namespace ignite } /** - * Process successfull result. + * Process successful result. + * + * @param value Value. + */ + virtual void JobResultSuccess(int64_t) + { + res.SetResult(); + } + + /** + * Process successful result. * * @param reader Reader for stream with result. - * @param err Error. */ virtual void JobResultSuccess(binary::BinaryReaderImpl&) { res.SetResult(); } + /** + * Process successful null result. + */ + virtual void JobNullResultSuccess() + { + res.SetResult(); + } + /** * Reduce results of related jobs. */ diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h index 3f697d3dcbd0c..5c2fc905fae7c 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h @@ -138,15 +138,30 @@ namespace ignite int64_t OnContinuousQueryFilterApply(common::concurrent::SharedPointer& mem); /** - * Callback on future result recieved. + * Callback on future result received. + * + * @param handle Task handle. + * @param value Value. + */ + int64_t OnFuturePrimitiveResult(int64_t handle, int64_t value); + + /** + * Callback on future result received. * * @param handle Task handle. * @param mem Memory with data. */ - int64_t OnFutureResult(int64_t handle, common::concurrent::SharedPointer &mem); + int64_t OnFutureObjectResult(int64_t handle, common::concurrent::SharedPointer &mem); + + /** + * Callback on future null result received. + * + * @param handle Task handle. + */ + int64_t OnFutureNullResult(int64_t handle); /** - * Callback on future error recieved. + * Callback on future error received. * * @param handle Task handle. * @param mem Memory with data. diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index 0f3d94425745c..db0455c047744 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -240,6 +240,7 @@ + diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index ac789de16471a..3853390552102 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -255,6 +255,9 @@ Code\impl\compute + + Code\impl\compute + Code\impl\compute diff --git a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp index a313358fafa2b..85093918fd961 100644 --- a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp @@ -152,10 +152,13 @@ namespace ignite }; ClusterGroupImpl::ClusterGroupImpl(SP_IgniteEnvironment env, jobject javaRef) : - InteropTarget(env, javaRef), nodes(new std::vector()), nodesLock(), topVer(0), - predHolder(new ClusterNodePredicateHolder) + InteropTarget(env, javaRef), + predHolder(new ClusterNodePredicateHolder), + nodes(), + nodesLock(), + topVer(0) { - computeImpl = InternalGetCompute(); + // No-op. } ClusterGroupImpl::~ClusterGroupImpl() @@ -480,16 +483,6 @@ namespace ignite return RefreshNodes(); } - ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::GetCompute() - { - return computeImpl; - } - - ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::GetCompute(ClusterGroup grp) - { - return grp.GetImpl().Get()->GetCompute(); - } - bool ClusterGroupImpl::IsActive() { IgniteError err; @@ -558,6 +551,11 @@ namespace ignite return predHolder.Get(); } + const IgnitePredicate* ClusterGroupImpl::GetPredicate() const + { + return predHolder.Get(); + } + std::vector ClusterGroupImpl::GetTopology(int64_t version) { SharedPointer memIn = GetEnvironment().AllocateMemory(); @@ -618,11 +616,9 @@ namespace ignite return SP_ClusterGroupImpl(new ClusterGroupImpl(GetEnvironmentPointer(), javaRef)); } - ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::InternalGetCompute() + jobject ClusterGroupImpl::GetComputeProcessor() { - jobject computeProc = GetEnvironment().GetProcessorCompute(GetTarget()); - - return SP_ComputeImpl(new compute::ComputeImpl(GetEnvironmentPointer(), computeProc)); + return GetEnvironment().GetProcessorCompute(GetTarget()); } ClusterGroupImpl::SP_ClusterNodes ClusterGroupImpl::ReadNodes(binary::BinaryReaderImpl& reader) diff --git a/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp index 591dd1f6f85b3..e629dd38053eb 100644 --- a/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp @@ -25,11 +25,22 @@ namespace ignite { namespace compute { - ComputeImpl::ComputeImpl(SharedPointer env, jobject javaRef) : - InteropTarget(env, javaRef) + ComputeImpl::ComputeImpl(SharedPointer env, cluster::SP_ClusterGroupImpl clusterGroup) : + InteropTarget(env, clusterGroup.Get()->GetComputeProcessor()), + clusterGroup(clusterGroup) { // No-op. } + + bool ComputeImpl::ProjectionContainsPredicate() const + { + return clusterGroup.IsValid() && clusterGroup.Get()->GetPredicate() != 0; + } + + std::vector ComputeImpl::GetNodes() + { + return clusterGroup.Get()->GetNodes(); + } } } } \ No newline at end of file diff --git a/modules/platforms/cpp/core/src/impl/handle_registry.cpp b/modules/platforms/cpp/core/src/impl/handle_registry.cpp index a27976298ae32..76f2e519e0da1 100644 --- a/modules/platforms/cpp/core/src/impl/handle_registry.cpp +++ b/modules/platforms/cpp/core/src/impl/handle_registry.cpp @@ -121,7 +121,9 @@ namespace ignite void HandleRegistry::Release(int64_t hnd) { - if (hnd < fastCap) + if (hnd < 0) + return; + else if (hnd < fastCap) fast[static_cast(hnd)] = SharedPointer(); else { diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp index bb2d9dc0d3520..894add516053e 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp @@ -59,6 +59,14 @@ namespace ignite CONTINUOUS_QUERY_FILTER_CREATE = 19, CONTINUOUS_QUERY_FILTER_APPLY = 20, CONTINUOUS_QUERY_FILTER_RELEASE = 21, + FUTURE_BYTE_RESULT = 24, + FUTURE_BOOL_RESULT = 25, + FUTURE_SHORT_RESULT = 26, + FUTURE_CHAR_RESULT = 27, + FUTURE_INT_RESULT = 28, + FUTURE_FLOAT_RESULT = 29, + FUTURE_LONG_RESULT = 30, + FUTURE_DOUBLE_RESULT = 31, FUTURE_OBJECT_RESULT = 32, FUTURE_NULL_RESULT = 33, FUTURE_ERROR = 34, @@ -268,9 +276,7 @@ namespace ignite case OperationCallback::FUTURE_NULL_RESULT: { - SharedPointer mem = env->Get()->AllocateMemory(); - - env->Get()->OnFutureResult(val, mem); + env->Get()->OnFutureNullResult(val); break; } @@ -350,11 +356,25 @@ namespace ignite break; } + case OperationCallback::FUTURE_BYTE_RESULT: + case OperationCallback::FUTURE_BOOL_RESULT: + case OperationCallback::FUTURE_SHORT_RESULT: + case OperationCallback::FUTURE_CHAR_RESULT: + case OperationCallback::FUTURE_INT_RESULT: + case OperationCallback::FUTURE_LONG_RESULT: + case OperationCallback::FUTURE_FLOAT_RESULT: + case OperationCallback::FUTURE_DOUBLE_RESULT: + { + env->Get()->OnFuturePrimitiveResult(val1, val2); + + break; + } + case OperationCallback::FUTURE_OBJECT_RESULT: { SharedPointer mem = env->Get()->GetMemory(val2); - env->Get()->OnFutureResult(val1, mem); + env->Get()->OnFutureObjectResult(val1, mem); break; } @@ -814,7 +834,22 @@ namespace ignite return res ? 1 : 0; } - int64_t IgniteEnvironment::OnFutureResult(int64_t handle, SharedPointer& mem) + int64_t IgniteEnvironment::OnFuturePrimitiveResult(int64_t handle, int64_t value) + { + SharedPointer task0 = + StaticPointerCast(registry.Get(handle)); + + registry.Release(handle); + + compute::ComputeTaskHolder* task = task0.Get(); + + task->JobResultSuccess(value); + task->Reduce(); + + return 1; + } + + int64_t IgniteEnvironment::OnFutureObjectResult(int64_t handle, SharedPointer& mem) { InteropInputStream inStream(mem.Get()); BinaryReaderImpl reader(&inStream); @@ -832,6 +867,21 @@ namespace ignite return 1; } + int64_t IgniteEnvironment::OnFutureNullResult(int64_t handle) + { + SharedPointer task0 = + StaticPointerCast(registry.Get(handle)); + + registry.Release(handle); + + compute::ComputeTaskHolder* task = task0.Get(); + + task->JobNullResultSuccess(); + task->Reduce(); + + return 1; + } + int64_t IgniteEnvironment::OnFutureError(int64_t handle, SharedPointer& mem) { InteropInputStream inStream(mem.Get()); diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp index 5925027e24ec6..522d612e33e01 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp @@ -130,12 +130,14 @@ namespace ignite { SP_ClusterGroupImpl serversCluster = prjImpl.Get().Get()->ForServers(); - return serversCluster.Get()->GetCompute(); + return GetCompute(serversCluster); } IgniteImpl::SP_ComputeImpl IgniteImpl::GetCompute(ClusterGroup grp) { - return this->GetProjection().Get()->GetCompute(grp); + SP_ClusterGroupImpl grpImpl = grp.GetImpl(); + + return SP_ComputeImpl(new compute::ComputeImpl(GetEnvironmentPointer(), grpImpl)); } void IgniteImpl::DisableWal(std::string cacheName) diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/app/application_data_buffer.h b/modules/platforms/cpp/odbc/include/ignite/odbc/app/application_data_buffer.h index 774c5717084f5..50d32448835b7 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/app/application_data_buffer.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/app/application_data_buffer.h @@ -44,13 +44,13 @@ namespace ignite { enum Type { - /** Conversion successfull. No data lost. */ + /** Conversion successful. No data lost. */ AI_SUCCESS, - /** Conversion successfull, but fractional truncation occurred. */ + /** Conversion successful, but fractional truncation occurred. */ AI_FRACTIONAL_TRUNCATED, - /** Conversion successfull, but right-side variable length data truncation occurred. */ + /** Conversion successful, but right-side variable length data truncation occurred. */ AI_VARLEN_DATA_TRUNCATED, /** Conversion is not supported. */