@@ -22,13 +22,14 @@ import org.apache.kafka.common.acl._
2222import org .apache .kafka .common .acl .AclOperation .{ALL , ALTER , ALTER_CONFIGS , CLUSTER_ACTION , CREATE , DELETE , DESCRIBE , IDEMPOTENT_WRITE }
2323import org .apache .kafka .common .acl .AclPermissionType .{ALLOW , DENY }
2424import org .apache .kafka .common .config .{ConfigResource , SaslConfigs , TopicConfig }
25- import org .apache .kafka .common .errors .{ClusterAuthorizationException , InvalidRequestException , TopicAuthorizationException , UnknownTopicOrPartitionException }
25+ import org .apache .kafka .common .errors .{ClusterAuthorizationException , DelegationTokenExpiredException , DelegationTokenNotFoundException , InvalidRequestException , TopicAuthorizationException , UnknownTopicOrPartitionException }
2626import org .apache .kafka .common .resource .PatternType .LITERAL
2727import org .apache .kafka .common .resource .ResourceType .{GROUP , TOPIC }
2828import org .apache .kafka .common .resource .{PatternType , Resource , ResourcePattern , ResourcePatternFilter , ResourceType }
2929import org .apache .kafka .common .security .auth .{KafkaPrincipal , SecurityProtocol }
30+ import org .apache .kafka .common .security .token .delegation .DelegationToken
3031import org .apache .kafka .security .authorizer .AclEntry .{WILDCARD_HOST , WILDCARD_PRINCIPAL_STRING }
31- import org .apache .kafka .server .config .{ServerConfigs , ZkConfigs }
32+ import org .apache .kafka .server .config .{DelegationTokenManagerConfigs , ServerConfigs , ZkConfigs }
3233import org .apache .kafka .metadata .authorizer .StandardAuthorizer
3334import org .apache .kafka .storage .internals .log .LogConfig
3435import org .junit .jupiter .api .Assertions ._
@@ -67,6 +68,10 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
6768 this .serverConfig.setProperty(AclAuthorizer .SuperUsersProp , kafkaPrincipal.toString)
6869 }
6970
71+ // Enable delegationTokenControlManager
72+ serverConfig.setProperty(DelegationTokenManagerConfigs .DELEGATION_TOKEN_SECRET_KEY_CONFIG , " 123" )
73+ serverConfig.setProperty(DelegationTokenManagerConfigs .DELEGATION_TOKEN_MAX_LIFETIME_CONFIG , " 5000" )
74+
7075 setUpSasl()
7176 super .setUp(testInfo)
7277 setInitialAcls()
@@ -520,6 +525,50 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
520525 }
521526 }
522527
528+ @ ParameterizedTest
529+ @ ValueSource (strings = Array (" zk" , " kraft" ))
530+ def testExpireDelegationToken (quorum : String ): Unit = {
531+ client = createAdminClient
532+ val createDelegationTokenOptions = new CreateDelegationTokenOptions ()
533+
534+ // Test expiration for non-exists token
535+ TestUtils .assertFutureExceptionTypeEquals(
536+ client.expireDelegationToken(" " .getBytes()).expiryTimestamp(),
537+ classOf [DelegationTokenNotFoundException ]
538+ )
539+
540+ // Test expiring the token immediately
541+ val token1 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
542+ TestUtils .retry(maxWaitMs = 1000 ) { assertTrue(expireTokenOrFailWithAssert(token1, - 1 ) < System .currentTimeMillis()) }
543+
544+ // Test expiring the expired token
545+ val token2 = client.createDelegationToken(createDelegationTokenOptions.maxlifeTimeMs(1000 )).delegationToken().get()
546+ // Ensure current time > maxLifeTimeMs of token
547+ Thread .sleep(1000 )
548+ TestUtils .assertFutureExceptionTypeEquals(
549+ client.expireDelegationToken(token2.hmac(), new ExpireDelegationTokenOptions ().expiryTimePeriodMs(1 )).expiryTimestamp(),
550+ classOf [DelegationTokenExpiredException ]
551+ )
552+
553+ // Ensure expiring the expired token with negative expiryTimePeriodMs will not throw exception
554+ assertDoesNotThrow(() => expireTokenOrFailWithAssert(token2, - 1 ))
555+
556+ // Test shortening the expiryTimestamp
557+ val token3 = client.createDelegationToken(createDelegationTokenOptions).delegationToken().get()
558+ TestUtils .retry(1000 ) { assertTrue(expireTokenOrFailWithAssert(token3, 200 ) < token3.tokenInfo().expiryTimestamp()) }
559+ }
560+
561+ private def expireTokenOrFailWithAssert (token : DelegationToken , expiryTimePeriodMs : Long ): Long = {
562+ try {
563+ client.expireDelegationToken(token.hmac(), new ExpireDelegationTokenOptions ().expiryTimePeriodMs(expiryTimePeriodMs))
564+ .expiryTimestamp().get()
565+ } catch {
566+ // If metadata is not synced yet, the response will contain an errorCode, causing an exception to be thrown.
567+ // This wrapper is designed to work with TestUtils.retry
568+ case _ : ExecutionException => throw new AssertionError (" Metadata not sync yet." )
569+ }
570+ }
571+
523572 private def describeConfigs (topic : String ): Iterable [ConfigEntry ] = {
524573 val topicResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
525574 var configEntries : Iterable [ConfigEntry ] = null
0 commit comments