Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options to support 'NoOp' when key collisions occur during an append. #275

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.spark.sql.redis

import java.nio.charset.StandardCharsets.UTF_8

import redis.clients.jedis.Pipeline
import redis.clients.jedis.params.SetParams

class BinaryNXRedisPersistence extends BinaryRedisPersistence {

override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = {
val keyBytes = key.getBytes(UTF_8)
val setParameters = SetParams.setParams()
.nx()
.ex(ttl)
pipeline.set(keyBytes, value, setParameters)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.spark.sql.redis

import redis.clients.jedis.Pipeline

class HashNXRedisPersistence extends HashRedisPersistence {

override def save(pipeline: Pipeline, key: String, value: Any, ttl: Int): Unit = {
val javaValue = value.asInstanceOf[Map[String, String]]
javaValue.keySet.foreach( field => pipeline.hsetnx(key, field, javaValue(field)))
if (ttl > 0) {
pipeline.expire(key, ttl)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ object RedisPersistence {

private val providers =
Map(SqlOptionModelBinary -> new BinaryRedisPersistence(),
SqlOptionModelHash -> new HashRedisPersistence())
SqlOptionModelHash -> new HashRedisPersistence(),
SqlOptionModelHashNoOpIfExists -> new HashNXRedisPersistence(),
SqlOptionModelBinaryNoOpIfExists -> new BinaryNXRedisPersistence())

def apply(model: String): RedisPersistence[Any] = {
// use hash model by default
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/org/apache/spark/sql/redis/redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package object redis {
val SqlOptionModel = "model"
val SqlOptionModelBinary = "binary"
val SqlOptionModelHash = "hash"
val SqlOptionModelHashNoOpIfExists = "hashnx"
val SqlOptionModelBinaryNoOpIfExists = "binarynx"
val SqlOptionInferSchema = "infer.schema"
val SqlOptionKeyColumn = "key.column"
val SqlOptionTTL = "ttl"
Expand Down