@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
642642configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
643643about delegation tokens, see [ Kafka delegation token docs] ( http://kafka.apache.org/documentation/#security_delegation_token ) .
644644
645- The process is initiated by Spark's Kafka delegation token provider. When ` spark.kafka.bootstrap.servers ` ,
645+ The process is initiated by Spark's Kafka delegation token provider. When ` spark.kafka.bootstrap.servers ` is set ,
646646Spark considers the following log in options, in order of preference:
647- - ** JAAS login configuration**
647+ - ** JAAS login configuration** , please see example below.
648648- ** Keytab file** , such as,
649649
650650 ./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
669669
670670After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
671671Delegation token uses ` SCRAM ` login module for authentication and because of that the appropriate
672- ` sasl.mechanism ` has to be configured on source/sink (it must match with Kafka broker configuration):
673-
674- <div class =" codetabs " >
675- <div data-lang =" scala " markdown =" 1 " >
676- {% highlight scala %}
677-
678- // Setting on Kafka Source for Streaming Queries
679- val df = spark
680- .readStream
681- .format("kafka")
682- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
683- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
684- .option("subscribe", "topic1")
685- .load()
686- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
687- .as[ (String, String)]
688-
689- // Setting on Kafka Source for Batch Queries
690- val df = spark
691- .read
692- .format("kafka")
693- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
694- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
695- .option("subscribe", "topic1")
696- .load()
697- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
698- .as[ (String, String)]
699-
700- // Setting on Kafka Sink for Streaming Queries
701- val ds = df
702- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
703- .writeStream
704- .format("kafka")
705- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
706- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
707- .option("topic", "topic1")
708- .start()
709-
710- // Setting on Kafka Sink for Batch Queries
711- val ds = df
712- .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
713- .write
714- .format("kafka")
715- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
716- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
717- .save()
718-
719- {% endhighlight %}
720- </div >
721- <div data-lang =" java " markdown =" 1 " >
722- {% highlight java %}
723-
724- // Setting on Kafka Source for Streaming Queries
725- Dataset<Row > df = spark
726- .readStream()
727- .format("kafka")
728- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
729- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
730- .option("subscribe", "topic1")
731- .load();
732- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
733-
734- // Setting on Kafka Source for Batch Queries
735- Dataset<Row > df = spark
736- .read()
737- .format("kafka")
738- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
739- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
740- .option("subscribe", "topic1")
741- .load();
742- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
743-
744- // Setting on Kafka Sink for Streaming Queries
745- StreamingQuery ds = df
746- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
747- .writeStream()
748- .format("kafka")
749- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
750- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
751- .option("topic", "topic1")
752- .start();
753-
754- // Setting on Kafka Sink for Batch Queries
755- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
756- .write()
757- .format("kafka")
758- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
759- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
760- .option("topic", "topic1")
761- .save();
762-
763- {% endhighlight %}
764- </div >
765- <div data-lang =" python " markdown =" 1 " >
766- {% highlight python %}
767-
768- // Setting on Kafka Source for Streaming Queries
769- df = spark \
770- .readStream \
771- .format("kafka") \
772- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
773- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
774- .option("subscribe", "topic1") \
775- .load()
776- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
777-
778- // Setting on Kafka Source for Batch Queries
779- df = spark \
780- .read \
781- .format("kafka") \
782- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
783- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
784- .option("subscribe", "topic1") \
785- .load()
786- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
787-
788- // Setting on Kafka Sink for Streaming Queries
789- ds = df \
790- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
791- .writeStream \
792- .format("kafka") \
793- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
794- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
795- .option("topic", "topic1") \
796- .start()
797-
798- // Setting on Kafka Sink for Batch Queries
799- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
800- .write \
801- .format("kafka") \
802- .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
803- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
804- .option("topic", "topic1") \
805- .save()
806-
807- {% endhighlight %}
808- </div >
809- </div >
672+ ` spark.kafka.sasl.token.mechanism ` (default: ` SCRAM-SHA-512 ` ) has to be configured. Also, this parameter
673+ must match with Kafka broker configuration.
810674
811675When delegation token is available on an executor it can be overridden with JAAS login configuration.
812676
0 commit comments