diff --git a/janusgraph-couchbase/pom.xml b/janusgraph-couchbase/pom.xml
new file mode 100644
index 00000000000..a884bc2b4b6
--- /dev/null
+++ b/janusgraph-couchbase/pom.xml
@@ -0,0 +1,257 @@
+
+
+ 4.0.0
+
+ org.janusgraph
+ janusgraph
+ 1.1.0-SNAPSHOT
+ ../pom.xml
+
+
+ janusgraph-couchbase
+ JanusGraph-Couchbase: Couchbase Backend for JanusGraph
+
+
+ 3.8.1
+ 1.8
+ 1.8
+ 1.8
+ 2.3.4
+ 3.3.4
+ 9.3.0
+ 2.10.3
+ 3.2.0
+ false
+ true
+ 1.15.3
+
+
+
+
+ Jagadesh Munta
+ jagadesh.munta@couchbase.com
+
+
+ Denis Souza Rosa
+ denis.rosa@couchbase.com
+
+
+ Dmitrii Chechetkin
+ dmitrii.chechetkin@couchbase.com
+
+
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+ org.janusgraph
+ janusgraph-core
+ ${project.version}
+
+
+ org.janusgraph
+ janusgraph-server
+ ${project.version}
+ provided
+
+
+ ch.qos.logback
+ logback-classic
+ 0.9.24
+ provided
+
+
+ org.janusgraph
+ janusgraph-backend-testutils
+ ${project.version}
+ test
+
+
+ org.janusgraph
+ janusgraph-test
+ ${project.version}
+ test
+
+
+ com.couchbase.client
+ core-io
+ ${couchbase.core-io.version}
+
+
+ com.couchbase.client
+ java-client
+ ${couchbase.java-client.version}
+
+
+ org.apache.lucene
+ lucene-queryparser
+ ${lucene-parser.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson-databind.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.6
+
+
+ io.reactivex
+ rxjava
+ 1.3.8
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+ org.mockito
+ mockito-inline
+ 3.12.4
+ test
+
+
+ org.powermock
+ powermock-core
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ 2.0.9
+ test
+
+
+ org.powermock
+ powermock-reflect
+ 2.0.9
+ test
+
+
+
+
+ ${basedir}/target
+
+
+ ${basedir}/src/test/resources
+
+
+
+
+ maven-compiler-plugin
+ ${maven.compiler.plugin.version}
+
+
+ ${jdk.version}
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ ${dependency.plugin.version}
+
+
+ copy-dependencies
+ prepare-package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/lib
+ compile
+
+
+
+
+
+ maven-surefire-plugin
+ 2.22.1
+
+ none
+ alphabetical
+ false
+
+ **/*PerformanceTest.java
+ **/*ConcurrentTest.java
+ **/*Groovy*Test.java
+ **/*ComputerTest.java
+ **/*ProcessTest.java
+ **/*ProcessPerformanceTest.java
+ **/*StructureTest.java
+
+ ${test.skip.default}
+
+
+
+ log4j.configuration
+ file:${project.build.directory}/test-classes/log4j.properties
+
+
+
+
+
+ tinkerpop-test
+
+ test
+
+ test
+
+ false
+ 1
+ none
+ 1
+ false
+
+ **/*Groovy*Test.java
+ **/*ComputerTest.java
+ **/*ProcessTest.java
+ **/*ProcessPerformanceTest.java
+ **/*StructureTest.java
+
+ alphabetical
+ ${test.skip.tp}
+
+ ${project.build.directory}
+ file:${project.build.directory}/test-classes/log4j.properties
+ true
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ ${jdk.version}
+
+
+
+
+
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java
new file mode 100644
index 00000000000..cc9c64d8a29
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/AbstractDocument.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import com.couchbase.client.core.msg.kv.MutationToken;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Common parent implementation of a {@link Document}.
+ *
+ * It is recommended that all {@link Document} implementations extend from this class so that parameter checks
+ * are consistently applied. It also ensures that equals and hashcode are applied on the contents and therefore
+ * comparisons work as expected.
+ *
+ * @author Michael Nitschinger
+ * @since 2.0.0
+ */
+public abstract class AbstractDocument implements Document {
+
+ public static final int MAX_ID_LENGTH = 240;
+ private String id;
+ private long cas;
+ private int expiry;
+ private T content;
+ private MutationToken mutationToken;
+
+ /**
+ * Constructor needed for possible subclass serialization.
+ */
+ protected AbstractDocument() {
+ }
+
+ protected AbstractDocument(String id, int expiry, T content, long cas) {
+ this(id, expiry, content, cas, null);
+ }
+
+ protected AbstractDocument(String id, int expiry, T content, long cas, MutationToken mutationToken) {
+ if (id == null || id.isEmpty()) {
+ throw new IllegalArgumentException("The Document ID must not be null or empty.");
+ }
+ // Quick sanity check, but not 100% accurate. UTF-8 encoding avoided because of double
+ // allocations, it is done in core with proper exact error handling anyways.
+ if (id.length() > MAX_ID_LENGTH) {
+ throw new IllegalArgumentException("The Document ID must not be larger than 250 bytes");
+ }
+ if (expiry < 0) {
+ throw new IllegalArgumentException("The Document expiry must not be negative.");
+ }
+
+ this.id = id;
+ this.cas = cas;
+ this.expiry = expiry;
+ this.content = content;
+ this.mutationToken = mutationToken;
+ }
+
+ @Override
+ public String id() {
+ return id;
+ }
+
+ @Override
+ public long cas() {
+ return cas;
+ }
+
+ @Override
+ public int expiry() {
+ return expiry;
+ }
+
+ @Override
+ public T content() {
+ return content;
+ }
+
+ @Override
+ public MutationToken mutationToken() {
+ return mutationToken;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName() + "{");
+ sb.append("id='").append(id).append('\'');
+ sb.append(", cas=").append(cas);
+ sb.append(", expiry=").append(expiry);
+ sb.append(", content=").append(content);
+ sb.append(", mutationToken=").append(mutationToken);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractDocument> that = (AbstractDocument>) o;
+
+ if (cas != that.cas) return false;
+ if (expiry != that.expiry) return false;
+ if (id != null ? !id.equals(that.id) : that.id != null) return false;
+ if (content != null ? !content.equals(that.content) : that.content != null) return false;
+ return !(mutationToken != null ? !mutationToken.equals(that.mutationToken) : that.mutationToken != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (int) (cas ^ (cas >>> 32));
+ result = 31 * result + expiry;
+ result = 31 * result + (content != null ? content.hashCode() : 0);
+ result = 31 * result + (mutationToken != null ? mutationToken.hashCode() : 0);
+ return result;
+ }
+
+ /**
+ * Helper method to write the current document state to the output stream for serialization purposes.
+ *
+ * @param stream the stream to write to.
+ * @throws IOException
+ */
+ protected void writeToSerializedStream(ObjectOutputStream stream) throws IOException {
+ stream.writeLong(cas);
+ stream.writeInt(expiry);
+ stream.writeUTF(id);
+ stream.writeObject(content);
+ stream.writeObject(mutationToken);
+ }
+
+ /**
+ * Helper method to create the document from an object input stream, used for serialization purposes.
+ *
+ * @param stream the stream to read from.
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ @SuppressWarnings("unchecked")
+ protected void readFromSerializedStream(final ObjectInputStream stream) throws IOException, ClassNotFoundException {
+ cas = stream.readLong();
+ expiry = stream.readInt();
+ id = stream.readUTF();
+ content = (T) stream.readObject();
+ mutationToken = (MutationToken) stream.readObject();
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java
new file mode 100644
index 00000000000..96c2a748a48
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumn.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+public class CouchbaseColumn implements Comparable {
+ // attributes keys of json document
+ public static final String ID = "id";
+ public static final String TABLE = "table";
+ public static final String COLUMNS = "columns";
+ public static final String KEY = "key";
+ public static final String VALUE = "value";
+ public static final String EXPIRE = "expire";
+ public static final String TTL = "ttl";
+ // instance members
+ private String key;
+ private String value;
+ private long expire;
+ private int ttl;
+
+ public CouchbaseColumn(String key, String value, long expire, int ttl) {
+ this.key = key;
+ this.value = value;
+ this.expire = expire;
+ this.ttl = ttl;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public long getExpire() {
+ return expire;
+ }
+
+ public int getTtl() {
+ return ttl;
+ }
+
+ public int compareTo(CouchbaseColumn o) {
+ return key.compareTo(o.key);
+ }
+
+ public boolean equals(Object anObject) {
+ if (this == anObject) {
+ return true;
+ }
+ if (anObject instanceof CouchbaseColumn) {
+ CouchbaseColumn anotherColumn = (CouchbaseColumn)anObject;
+ return key.equals(anotherColumn.key);
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java
new file mode 100644
index 00000000000..91291e548bd
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseColumnConverter.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.StaticBuffer;
+import org.janusgraph.diskstorage.util.StaticArrayBuffer;
+
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+public class CouchbaseColumnConverter {//implements StaticBuffer.Factory {
+ public static final CouchbaseColumnConverter INSTANCE = new CouchbaseColumnConverter();
+ private static final char[] hexArray = "0123456789ABCDEF".toCharArray();
+
+
+// @Override
+// public String get(byte[] array, int offset, int limit) {
+// byte[] source = getSource(array, offset, limit);
+// return toString(source);
+// }
+//
+// public String toString(byte[] array) {
+// stringSerializer.
+//
+//
+//// StaticBuffer sb = StaticArrayBuffer.of(array);
+//// return KeyValueStoreUtil.getString(sb);
+// //return Base64.getEncoder().encodeToString(array);
+// }
+
+
+ public static String toString(byte[] bytes) {
+ char[] hexChars = new char[bytes.length * 2];
+ for (int j = 0; j < bytes.length; j++) {
+ int v = bytes[j] & 0xFF;
+ hexChars[j * 2] = hexArray[v >>> 4];
+ hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+ }
+ String s = new String(hexChars);
+
+// byte[] b = toByteArray(s);
+// if (!Arrays.equals(bytes, b)) {
+// System.out.println("fail");
+// }
+
+ return s;
+ }
+
+ public byte[] toByteArray(String value) {
+// final StaticBuffer buffer = toStaticBuffer(value);
+// return buffer.getBytes(0, buffer.length());
+
+// StaticBuffer sb = KeyValueStoreUtil.getBuffer(value);
+// String s = toString(sb);
+// System.out.println(s);
+// assert value.equals(s);
+// return sb.getBytes(0, sb.length());
+ int len = value == null ? 0 : value.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(value.charAt(i), 16) << 4)
+ + Character.digit(value.charAt(i + 1), 16));
+ }
+ return data;
+ //return Base64.getDecoder().decode(value);
+ }
+
+ public static String toString(StaticBuffer buffer) {
+ return toString(buffer.as(StaticBuffer.ARRAY_FACTORY));
+ //return stringSerializer.read(buffer.asReadBuffer());
+ // return KeyValueStoreUtil.getString(buffer);
+ //return buffer.as(this);
+ }
+
+ public static String toId(String string) {
+ try {
+ byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
+ if (bytes.length > AbstractDocument.MAX_ID_LENGTH) {
+ MessageDigest digest = MessageDigest.getInstance("SHA-512");
+ digest.update(bytes);
+ return new StringBuilder(String.valueOf(bytes.length)).append(new String(digest.digest())).toString();
+ }
+ return string;
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StaticBuffer toStaticBuffer(String value) {
+ return StaticArrayBuffer.of(toByteArray(value));
+// WriteByteBuffer writeBuffer = new WriteByteBuffer();
+// stringSerializer.write(writeBuffer, value);
+// return writeBuffer.getStaticBuffer();
+ //return KeyValueStoreUtil.getBuffer(value);
+// return new StaticArrayBuffer(toByteArray(value));
+ }
+
+ public String toId(StaticBuffer staticBuffer) {
+ return toId(toString(staticBuffer));
+ }
+
+// private byte[] getSource(byte[] array, int offset, int limit) {
+// if (offset == 0 && limit == array.length)
+// return array;
+// else
+// return Arrays.copyOfRange(array, offset, limit);
+// }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java
new file mode 100644
index 00000000000..087dcf282e9
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseConfigOptions.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.configuration.ConfigNamespace;
+import org.janusgraph.diskstorage.configuration.ConfigOption;
+import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
+import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
+
+/**
+ * Configuration options for the Couchbase storage backend.
+ * These are managed under the 'cb' namespace in the configuration.
+ *
+ * @author Jagadesh Munta (jagadesh.munta@couchbase.com)
+ */
+@PreInitializeConfigOptions
+public interface CouchbaseConfigOptions {
+
+ ConfigNamespace CB_NS = new ConfigNamespace(
+ GraphDatabaseConfiguration.STORAGE_NS,
+ "cb",
+ "Couchbase storage backend options");
+
+ ConfigOption VERSION = new ConfigOption<>(
+ CB_NS,
+ "version",
+ "The version of the Couchbase cluster.",
+ ConfigOption.Type.LOCAL,
+ 703);
+
+ ConfigOption CLUSTER_CONNECT_STRING = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-string",
+ "Connect string to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "couchbase://localhost");
+
+ ConfigOption CLUSTER_CONNECT_USERNAME = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-username",
+ "Username to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "Administrator");
+
+ ConfigOption CLUSTER_CONNECT_PASSWORD = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-password",
+ "Password to the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "password");
+
+ ConfigOption CLUSTER_CONNECT_BUCKET = new ConfigOption<>(
+ CB_NS,
+ "cluster-connect-bucket",
+ "Bucket in the Couchbase cluster",
+ ConfigOption.Type.LOCAL,
+ "default");
+
+ ConfigOption CLUSTER_DEFAULT_SCOPE = new ConfigOption<>(
+ CB_NS,
+ "cluster-default-scope",
+ "Default Scope ",
+ ConfigOption.Type.LOCAL,
+ "_default");
+
+ ConfigOption CLUSTER_DEFAULT_COLLECTION = new ConfigOption<>(
+ CB_NS,
+ "cluster-default-collection",
+ "Default Collection",
+ ConfigOption.Type.LOCAL,
+ "_default");
+
+ ConfigOption ISOLATION_LEVEL = new ConfigOption<>(
+ CB_NS,
+ "isolation-level",
+ "Options are serializable, read_committed_no_write, read_committed_with_write",
+ ConfigOption.Type.LOCAL,
+ "serializable");
+
+ ConfigOption GET_RANGE_MODE = new ConfigOption<>(
+ CB_NS,
+ "get-range-mode",
+ "The mod of executing CB getRange, either `iterator` or `list`",
+ ConfigOption.Type.LOCAL,
+ "list"
+ );
+
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java
new file mode 100644
index 00000000000..49371be20d5
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseDocumentMutation.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
+
+public class CouchbaseDocumentMutation {
+ private String table;
+ private String documentId;
+ private KCVMutation mutation;
+
+ public CouchbaseDocumentMutation(String table, String documentId, KCVMutation mutation) {
+ this.table = table;
+ this.documentId = documentId;
+ this.mutation = mutation;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDocumentId() {
+ return documentId;
+ }
+
+ public String getHashId() {
+ return CouchbaseColumnConverter.toId(documentId);
+ }
+
+ public KCVMutation getMutation() {
+ return mutation;
+ }
+
+ public String getDocumentKey() {
+ return documentId;
+ }
+}
diff --git a/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java
new file mode 100644
index 00000000000..5351eabf17a
--- /dev/null
+++ b/janusgraph-couchbase/src/main/java/org/janusgraph/diskstorage/couchbase/CouchbaseIndex.java
@@ -0,0 +1,505 @@
+/*
+ * Copyright 2023 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.janusgraph.diskstorage.couchbase;
+
+import com.couchbase.client.core.deps.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import com.couchbase.client.core.env.ConnectionStringPropertyLoader;
+import com.couchbase.client.core.env.IoConfig;
+import com.couchbase.client.core.env.SecurityConfig;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.ClusterOptions;
+import com.couchbase.client.java.Collection;
+import com.couchbase.client.java.Scope;
+import com.couchbase.client.java.env.ClusterEnvironment;
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.manager.collection.CollectionSpec;
+import com.couchbase.client.java.query.QueryOptions;
+import com.couchbase.client.java.query.QueryResult;
+import com.couchbase.client.java.search.SearchOptions;
+import com.couchbase.client.java.search.SearchQuery;
+import com.couchbase.client.java.search.result.SearchResult;
+import org.apache.commons.lang3.StringUtils;
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.attribute.Cmp;
+import org.janusgraph.core.attribute.Geo;
+import org.janusgraph.core.attribute.Geoshape;
+import org.janusgraph.core.attribute.Text;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.BaseTransactionConfig;
+import org.janusgraph.diskstorage.BaseTransactionConfigurable;
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.couchbase.lucene.Lucene2CouchbaseQLTranslator;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexFeatures;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.graphdb.database.serialize.AttributeUtils;
+import org.janusgraph.graphdb.query.JanusGraphPredicate;
+import org.janusgraph.graphdb.query.condition.And;
+import org.janusgraph.graphdb.query.condition.Condition;
+import org.janusgraph.graphdb.query.condition.FixedCondition;
+import org.janusgraph.graphdb.query.condition.Not;
+import org.janusgraph.graphdb.query.condition.Or;
+import org.janusgraph.graphdb.query.condition.PredicateCondition;
+import org.janusgraph.graphdb.tinkerpop.optimize.step.Aggregation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.janusgraph.diskstorage.couchbase.CouchbaseConfigOptions.CLUSTER_CONNECT_STRING;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_BUCKET;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_PASSWORD;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_CONNECT_USERNAME;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_DEFAULT_FUZINESS;
+import static org.janusgraph.diskstorage.couchbase.CouchbaseIndexConfigOptions.CLUSTER_DEFAULT_SCOPE;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME;
+
+/**
+ * @author : chedim (chedim@couchbaser)
+ * @file : CouchbaseIndex
+ */
+public class CouchbaseIndex implements IndexProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseIndex.class);
+
+ private static final String STRING_MAPPING_SUFFIX = "__STRING";
+ static final String FTS_INDEX_NAME = "fulltext_index";
+ private final String name;
+ private final Cluster cluster;
+
+ private final Bucket bucket;
+
+ private final Scope scope;
+
+ private final int fuzziness;
+
+ private final String indexNamePrefix;
+
+ private final String indexNamespace;
+
+ public CouchbaseIndex(Configuration config) {
+ boolean isTLS = false;
+ final String connectString = config.get(CLUSTER_CONNECT_STRING);
+ if (connectString.startsWith("couchbases://")) {
+ isTLS = true;
+ }
+
+ ClusterEnvironment.Builder envBuilder = ClusterEnvironment.builder()
+ .ioConfig(IoConfig.enableDnsSrv(isTLS))
+ .securityConfig(SecurityConfig.enableTls(isTLS)
+ .trustManagerFactory(InsecureTrustManagerFactory.INSTANCE));
+
+ new ConnectionStringPropertyLoader(connectString).load(envBuilder);
+
+ ClusterEnvironment env = envBuilder.build();
+ name = config.get(INDEX_NAME);
+ cluster = Cluster.connect(connectString,
+ ClusterOptions.clusterOptions(config.get(CLUSTER_CONNECT_USERNAME),
+ config.get(CLUSTER_CONNECT_PASSWORD)).environment(env));
+
+ fuzziness = config.get(CLUSTER_DEFAULT_FUZINESS);
+
+ String bucketName = config.get(CLUSTER_CONNECT_BUCKET);
+ String scopeName = config.get(CLUSTER_DEFAULT_SCOPE);
+
+ bucket = cluster.bucket(bucketName);
+ scope = bucket.scope(scopeName);
+ indexNamePrefix = String.format("%s_%s", bucketName, scopeName);
+ indexNamespace = String.format("%s.%s", bucketName, scopeName);
+ }
+
+ @Override
+ public void register(String storeName, String key, KeyInformation information, BaseTransaction tx) throws BackendException {
+ ensureStorageExists(storeName);
+ CouchbaseIndexTransaction cbitx = (CouchbaseIndexTransaction) tx;
+ cbitx.register(storeName, key, information);
+ }
+
+ protected Collection getStorage(String name) {
+ Collection result = scope.collection(name);
+ if (result == null) {
+ bucket.collections().createCollection(CollectionSpec.create(name, scope.name()));
+ result = scope.collection(name);
+ }
+ return result;
+ }
+
+ protected String getIndexFullName(String name) {
+ return indexNamePrefix + "_" + name;
+ }
+
+ @Override
+ public void mutate(Map> mutations, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+ mutations.keySet().forEach(this::ensureStorageExists);
+ ((CouchbaseIndexTransaction)tx).mutate(mutations, information);
+ }
+
+ @Override
+ public void restore(Map>> documents, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+ documents.keySet().forEach(this::ensureStorageExists);
+ ((CouchbaseIndexTransaction)tx).restore(documents, information);
+ }
+
+ @Override
+ public Number queryAggregation(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx, Aggregation aggregation) throws BackendException {
+ final String aggType = aggregation.getType().name().toLowerCase();
+ final String fieldName = aggregation.getFieldName() == null ? "*" : aggregation.getFieldName();
+ return doQuery(String.format("%s(%s) as __agg_result", aggType, fieldName), query, information, tx)
+ .rowsAsObject().stream()
+ .findFirst().map(row -> row.getLong("__agg_result"))
+ .orElse(0L);
+ }
+
+ protected CollectionSpec ensureStorageExists(String name) {
+ return getCollection(name).orElseGet(() -> createCollection(name));
+ }
+
+ protected Optional getCollection(String name) {
+ return bucket.collections().getAllScopes()
+ .parallelStream()
+ .filter(scopeSpec -> scopeSpec.name().equals(scope.name()))
+ .flatMap(scopeSpec -> scopeSpec.collections().parallelStream())
+ .filter(collectionSpec -> collectionSpec.name().equals(name))
+ .findFirst();
+ }
+
+ protected CollectionSpec createCollection(String name) {
+ CollectionSpec collectionSpec = CollectionSpec.create(name, scope.name(), Duration.ZERO);
+ bucket.collections().createCollection(collectionSpec);
+
+ try {
+ Thread.sleep(2000);
+ scope.query("CREATE PRIMARY INDEX ON `" + name + "`");
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return collectionSpec;
+ }
+
+ protected List transformFilter(String storageName, Condition> condition) {
+ final List result = new LinkedList<>();
+ if (condition instanceof PredicateCondition) {
+ final PredicateCondition atom = (PredicateCondition) condition;
+ Object value = atom.getValue();
+ final String key = atom.getKey();
+ final JanusGraphPredicate predicate = atom.getPredicate();
+ final String fullIndexName = getIndexFullName(storageName);
+ if (value == null && predicate == Cmp.NOT_EQUAL) {
+ result.add(new QueryFilter(String.format("EXISTS %s", key)));
+ } else if (predicate == Cmp.EQUAL
+ || predicate == Cmp.NOT_EQUAL
+ || predicate == Cmp.GREATER_THAN
+ || predicate == Cmp.GREATER_THAN_EQUAL
+ || predicate == Cmp.LESS_THAN
+ || predicate == Cmp.LESS_THAN_EQUAL
+ ) {
+ result.add(new QueryFilter(String.format("%s %s ?", key, predicate), value));
+ } else if (predicate == Text.PREFIX || predicate == Text.NOT_PREFIX) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_PREFIX) {
+ statement.append("NOT ");
+ }
+ statement.append("POSITION(LOWER(")
+ .append(key)
+ .append("), LOWER(?)) = 0");
+
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if (predicate == Text.CONTAINS || predicate == Text.NOT_CONTAINS) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_CONTAINS) {
+ statement.append("NOT ");
+ }
+ statement.append("CONTAINS(LOWER(")
+ .append(key)
+ .append("), LOWER(?))");
+
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if ((predicate == Text.REGEX || predicate == Text.NOT_REGEX)) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_REGEX) {
+ statement.append("NOT ");
+ }
+ statement.append("REGEXP_MATCHES(")
+ .append(key)
+ .append(", ?)");
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if ((predicate == Text.CONTAINS_REGEX || predicate == Text.NOT_CONTAINS_REGEX)) {
+ StringBuilder statement = new StringBuilder();
+ if (predicate == Text.NOT_CONTAINS_REGEX) {
+ statement.append("NOT ");
+ }
+ statement.append("REGEXP_CONTAINS(")
+ .append(key)
+ .append(", ?)");
+ result.add(new QueryFilter(statement.toString(), value));
+ } else if (predicate instanceof Text) {
+ Text textPredicate = (Text) predicate;
+ String not = "";
+ if (textPredicate.name().toLowerCase(Locale.ROOT).startsWith("not_")) {
+ not = "NOT ";
+ }
+ result.add(new QueryFilter(
+ not + "SEARCH(?, ?)",
+ fullIndexName,
+ buildSearchQuery(key, predicate, value)
+ ));
+ } else if (predicate instanceof Geo) {
+ result.add(new QueryFilter(
+ "SEARCH(?, ?)",
+ fullIndexName,
+ buildGeoQuery(key, predicate, value)
+ ));
+ }else {
+ throw new IllegalArgumentException("Unsupported predicate: " + predicate.getClass().getCanonicalName());
+ }
+ } else if (condition instanceof Not) {
+ transformFilter(storageName, ((Not>) condition).getChild()).stream()
+ .map(qf -> new QueryFilter("NOT (" + qf.query() + ")", qf.arguments()))
+ .forEach(result::add);
+ } else if (condition instanceof And || condition instanceof Or) {
+ LinkedList statements = new LinkedList<>();
+ LinkedList