1616 */
1717package org .apache .spark .deploy .k8s
1818
19- import scala . collection . mutable
19+ import java . util . Locale
2020
2121import io .fabric8 .kubernetes .api .model .{LocalObjectReference , LocalObjectReferenceBuilder , Pod }
22- import org .apache .hadoop .conf .Configuration
2322
2423import org .apache .spark .SparkConf
2524import org .apache .spark .deploy .k8s .Config ._
2625import org .apache .spark .deploy .k8s .Constants ._
27- import org .apache .spark .deploy .k8s .security .KubernetesHadoopDelegationTokenManager
2826import org .apache .spark .deploy .k8s .submit ._
29- import org .apache .spark .deploy .k8s .submit .KubernetesClientApplication ._
3027import org .apache .spark .internal .config .ConfigEntry
3128import org .apache .spark .util .Utils
3229
33-
34- private [spark] sealed trait KubernetesRoleSpecificConf
35-
36- /*
37- * Structure containing metadata for Kubernetes logic that builds a Spark driver.
38- */
39- private [spark] case class KubernetesDriverSpecificConf (
40- mainAppResource : MainAppResource ,
41- mainClass : String ,
42- appName : String ,
43- appArgs : Seq [String ],
44- pyFiles : Seq [String ] = Nil ) extends KubernetesRoleSpecificConf {
45-
46- require(mainAppResource != null , " Main resource must be provided." )
47-
48- }
49-
50- /*
51- * Structure containing metadata for Kubernetes logic that builds a Spark executor.
52- */
53- private [spark] case class KubernetesExecutorSpecificConf (
54- executorId : String ,
55- driverPod : Option [Pod ])
56- extends KubernetesRoleSpecificConf
57-
58- /*
59- * Structure containing metadata for HADOOP_CONF_DIR customization
60- */
61- private [spark] case class HadoopConfSpec (
62- hadoopConfDir : Option [String ],
63- hadoopConfigMapName : Option [String ])
64-
6530/**
6631 * Structure containing metadata for Kubernetes logic to build Spark pods.
6732 */
68- private [spark] case class KubernetesConf [T <: KubernetesRoleSpecificConf ](
69- sparkConf : SparkConf ,
70- roleSpecificConf : T ,
71- appResourceNamePrefix : String ,
72- appId : String ,
73- roleLabels : Map [String , String ],
74- roleAnnotations : Map [String , String ],
75- roleSecretNamesToMountPaths : Map [String , String ],
76- roleSecretEnvNamesToKeyRefs : Map [String , String ],
77- roleEnvs : Map [String , String ],
78- roleVolumes : Iterable [KubernetesVolumeSpec [_ <: KubernetesVolumeSpecificConf ]],
79- hadoopConfSpec : Option [HadoopConfSpec ]) {
33+ private [spark] abstract class KubernetesConf (val sparkConf : SparkConf ) {
8034
81- def hadoopConfigMapName : String = s " $appResourceNamePrefix-hadoop-config "
35+ val resourceNamePrefix : String
36+ def labels : Map [String , String ]
37+ def environment : Map [String , String ]
38+ def annotations : Map [String , String ]
39+ def secretEnvNamesToKeyRefs : Map [String , String ]
40+ def secretNamesToMountPaths : Map [String , String ]
41+ def volumes : Seq [KubernetesVolumeSpec ]
8242
83- def krbConfigMapName : String = s " $appResourceNamePrefix -krb5-file "
43+ def appName : String = get( " spark.app.name " , " spark " )
8444
85- def tokenManager (conf : SparkConf , hConf : Configuration ): KubernetesHadoopDelegationTokenManager =
86- new KubernetesHadoopDelegationTokenManager (conf, hConf)
45+ def hadoopConfigMapName : String = s " $resourceNamePrefix-hadoop-config "
8746
88- def namespace () : String = sparkConf.get( KUBERNETES_NAMESPACE )
47+ def krbConfigMapName : String = s " $resourceNamePrefix -krb5-file "
8948
90- def imagePullPolicy () : String = sparkConf. get(CONTAINER_IMAGE_PULL_POLICY )
49+ def namespace : String = get(KUBERNETES_NAMESPACE )
9150
92- def imagePullSecrets (): Seq [LocalObjectReference ] = {
51+ def imagePullPolicy : String = get(CONTAINER_IMAGE_PULL_POLICY )
52+
53+ def imagePullSecrets : Seq [LocalObjectReference ] = {
9354 sparkConf
9455 .get(IMAGE_PULL_SECRETS )
95- .map(_.split(" ," ))
96- .getOrElse(Array .empty[String ])
97- .map(_.trim)
9856 .map { secret =>
9957 new LocalObjectReferenceBuilder ().withName(secret).build()
10058 }
10159 }
10260
103- def nodeSelector () : Map [String , String ] =
61+ def nodeSelector : Map [String , String ] =
10462 KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX )
10563
64+ def contains (config : ConfigEntry [_]): Boolean = sparkConf.contains(config)
65+
10666 def get [T ](config : ConfigEntry [T ]): T = sparkConf.get(config)
10767
10868 def get (conf : String ): String = sparkConf.get(conf)
@@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
11272 def getOption (key : String ): Option [String ] = sparkConf.getOption(key)
11373}
11474
75+ private [spark] class KubernetesDriverConf (
76+ sparkConf : SparkConf ,
77+ val appId : String ,
78+ val mainAppResource : MainAppResource ,
79+ val mainClass : String ,
80+ val appArgs : Array [String ],
81+ val pyFiles : Seq [String ])
82+ extends KubernetesConf (sparkConf) {
83+
84+ override val resourceNamePrefix : String = {
85+ val custom = if (Utils .isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX ) else None
86+ custom.getOrElse(KubernetesConf .getResourceNamePrefix(appName))
87+ }
88+
89+ override def labels : Map [String , String ] = {
90+ val presetLabels = Map (
91+ SPARK_APP_ID_LABEL -> appId,
92+ SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE )
93+ val driverCustomLabels = KubernetesUtils .parsePrefixedKeyValuePairs(
94+ sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX )
95+
96+ presetLabels.keys.foreach { key =>
97+ require(
98+ ! driverCustomLabels.contains(key),
99+ s " Label with key $key is not allowed as it is reserved for Spark bookkeeping operations. " )
100+ }
101+
102+ driverCustomLabels ++ presetLabels
103+ }
104+
105+ override def environment : Map [String , String ] = {
106+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX )
107+ }
108+
109+ override def annotations : Map [String , String ] = {
110+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX )
111+ }
112+
113+ override def secretNamesToMountPaths : Map [String , String ] = {
114+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX )
115+ }
116+
117+ override def secretEnvNamesToKeyRefs : Map [String , String ] = {
118+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX )
119+ }
120+
121+ override def volumes : Seq [KubernetesVolumeSpec ] = {
122+ KubernetesVolumeUtils .parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX )
123+ }
124+ }
125+
126+ private [spark] class KubernetesExecutorConf (
127+ sparkConf : SparkConf ,
128+ val appId : String ,
129+ val executorId : String ,
130+ val driverPod : Option [Pod ])
131+ extends KubernetesConf (sparkConf) {
132+
133+ override val resourceNamePrefix : String = {
134+ get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX ).getOrElse(
135+ KubernetesConf .getResourceNamePrefix(appName))
136+ }
137+
138+ override def labels : Map [String , String ] = {
139+ val presetLabels = Map (
140+ SPARK_EXECUTOR_ID_LABEL -> executorId,
141+ SPARK_APP_ID_LABEL -> appId,
142+ SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE )
143+
144+ val executorCustomLabels = KubernetesUtils .parsePrefixedKeyValuePairs(
145+ sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX )
146+
147+ presetLabels.keys.foreach { key =>
148+ require(
149+ ! executorCustomLabels.contains(key),
150+ s " Custom executor labels cannot contain $key as it is reserved for Spark. " )
151+ }
152+
153+ executorCustomLabels ++ presetLabels
154+ }
155+
156+ override def environment : Map [String , String ] = sparkConf.getExecutorEnv.toMap
157+
158+ override def annotations : Map [String , String ] = {
159+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX )
160+ }
161+
162+ override def secretNamesToMountPaths : Map [String , String ] = {
163+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX )
164+ }
165+
166+ override def secretEnvNamesToKeyRefs : Map [String , String ] = {
167+ KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX )
168+ }
169+
170+ override def volumes : Seq [KubernetesVolumeSpec ] = {
171+ KubernetesVolumeUtils .parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX )
172+ }
173+
174+ }
175+
115176private [spark] object KubernetesConf {
116177 def createDriverConf (
117178 sparkConf : SparkConf ,
118- appName : String ,
119- appResourceNamePrefix : String ,
120179 appId : String ,
121180 mainAppResource : MainAppResource ,
122181 mainClass : String ,
123182 appArgs : Array [String ],
124- maybePyFiles : Option [String ],
125- hadoopConfDir : Option [String ]): KubernetesConf [KubernetesDriverSpecificConf ] = {
126- val driverCustomLabels = KubernetesUtils .parsePrefixedKeyValuePairs(
127- sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX )
128- require(! driverCustomLabels.contains(SPARK_APP_ID_LABEL ), " Label with key " +
129- s " $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
130- " operations." )
131- require(! driverCustomLabels.contains(SPARK_ROLE_LABEL ), " Label with key " +
132- s " $SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
133- " operations." )
134- val driverLabels = driverCustomLabels ++ Map (
135- SPARK_APP_ID_LABEL -> appId,
136- SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE )
137- val driverAnnotations = KubernetesUtils .parsePrefixedKeyValuePairs(
138- sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX )
139- val driverSecretNamesToMountPaths = KubernetesUtils .parsePrefixedKeyValuePairs(
140- sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX )
141- val driverSecretEnvNamesToKeyRefs = KubernetesUtils .parsePrefixedKeyValuePairs(
142- sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX )
143- val driverEnvs = KubernetesUtils .parsePrefixedKeyValuePairs(
144- sparkConf, KUBERNETES_DRIVER_ENV_PREFIX )
145- val driverVolumes = KubernetesVolumeUtils .parseVolumesWithPrefix(
146- sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX ).map(_.get)
147- // Also parse executor volumes in order to verify configuration
148- // before the driver pod is created
149- KubernetesVolumeUtils .parseVolumesWithPrefix(
150- sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX ).map(_.get)
151-
152- val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP )
153- KubernetesUtils .requireNandDefined(
154- hadoopConfDir,
155- hadoopConfigMapName,
156- " Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
157- " as the creation of an additional ConfigMap, when one is already specified is extraneous" )
158- val hadoopConfSpec =
159- if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
160- Some (HadoopConfSpec (hadoopConfDir, hadoopConfigMapName))
161- } else {
162- None
163- }
164- val pyFiles = maybePyFiles.map(Utils .stringToSeq).getOrElse(Nil )
183+ maybePyFiles : Option [String ]): KubernetesDriverConf = {
184+ // Parse executor volumes in order to verify configuration before the driver pod is created.
185+ KubernetesVolumeUtils .parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX )
165186
166-
167- KubernetesConf (
168- sparkConf.clone(),
169- KubernetesDriverSpecificConf (mainAppResource, mainClass, appName, appArgs, pyFiles),
170- appResourceNamePrefix,
171- appId,
172- driverLabels,
173- driverAnnotations,
174- driverSecretNamesToMountPaths,
175- driverSecretEnvNamesToKeyRefs,
176- driverEnvs,
177- driverVolumes,
178- hadoopConfSpec)
187+ val pyFiles = maybePyFiles.map(Utils .stringToSeq).getOrElse(Nil )
188+ new KubernetesDriverConf (sparkConf.clone(), appId, mainAppResource, mainClass, appArgs,
189+ pyFiles)
179190 }
180191
181192 def createExecutorConf (
182193 sparkConf : SparkConf ,
183194 executorId : String ,
184195 appId : String ,
185- driverPod : Option [Pod ]): KubernetesConf [KubernetesExecutorSpecificConf ] = {
186- val executorCustomLabels = KubernetesUtils .parsePrefixedKeyValuePairs(
187- sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX )
188- require(
189- ! executorCustomLabels.contains(SPARK_APP_ID_LABEL ),
190- s " Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark. " )
191- require(
192- ! executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL ),
193- s " Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for " +
194- " Spark." )
195- require(
196- ! executorCustomLabels.contains(SPARK_ROLE_LABEL ),
197- s " Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark. " )
198- val executorLabels = Map (
199- SPARK_EXECUTOR_ID_LABEL -> executorId,
200- SPARK_APP_ID_LABEL -> appId,
201- SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE ) ++
202- executorCustomLabels
203- val executorAnnotations = KubernetesUtils .parsePrefixedKeyValuePairs(
204- sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX )
205- val executorMountSecrets = KubernetesUtils .parsePrefixedKeyValuePairs(
206- sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX )
207- val executorEnvSecrets = KubernetesUtils .parsePrefixedKeyValuePairs(
208- sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX )
209- val executorEnv = sparkConf.getExecutorEnv.toMap
210- val executorVolumes = KubernetesVolumeUtils .parseVolumesWithPrefix(
211- sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX ).map(_.get)
212-
213- // If no prefix is defined then we are in pure client mode
214- // (not the one used by cluster mode inside the container)
215- val appResourceNamePrefix = {
216- if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX .key).isEmpty) {
217- getResourceNamePrefix(getAppName(sparkConf))
218- } else {
219- sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX )
220- }
221- }
196+ driverPod : Option [Pod ]): KubernetesExecutorConf = {
197+ new KubernetesExecutorConf (sparkConf.clone(), appId, executorId, driverPod)
198+ }
222199
223- KubernetesConf (
224- sparkConf.clone(),
225- KubernetesExecutorSpecificConf (executorId, driverPod),
226- appResourceNamePrefix,
227- appId,
228- executorLabels,
229- executorAnnotations,
230- executorMountSecrets,
231- executorEnvSecrets,
232- executorEnv,
233- executorVolumes,
234- None )
200+ def getResourceNamePrefix (appName : String ): String = {
201+ val launchTime = System .currentTimeMillis()
202+ s " $appName- $launchTime"
203+ .trim
204+ .toLowerCase(Locale .ROOT )
205+ .replaceAll(" \\ s+" , " -" )
206+ .replaceAll(" \\ ." , " -" )
207+ .replaceAll(" [^a-z0-9\\ -]" , " " )
208+ .replaceAll(" -+" , " -" )
235209 }
236210}
0 commit comments