@@ -42,7 +42,7 @@ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfig
4242import org .apache .kafka .common .errors ._
4343import org .apache .kafka .common .internals .Topic
4444import org .apache .kafka .common .KafkaException
45- import org .apache .kafka .common .quota .{ClientQuotaAlteration , ClientQuotaEntity , ClientQuotaFilter }
45+ import org .apache .kafka .common .quota .{ClientQuotaAlteration , ClientQuotaEntity , ClientQuotaFilter , ClientQuotaFilterComponent }
4646import org .apache .kafka .common .record .FileRecords
4747import org .apache .kafka .common .requests .DeleteRecordsRequest
4848import org .apache .kafka .common .resource .{PatternType , ResourcePattern , ResourceType }
@@ -138,6 +138,63 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
138138 assertEquals(configEntries, quotaEntities.get(entity).asScala)
139139 }
140140
141+ @ Test
142+ def testDefaultNameQuotaIsNotEqualToDefaultQuota (): Unit = {
143+ val config = createConfig
144+ val defaultQuota = " <default>"
145+ client = Admin .create(config)
146+
147+ // "<default>" can not create default quota
148+ val userEntity = new ClientQuotaEntity (Map (ClientQuotaEntity .USER -> defaultQuota).asJava)
149+ val clientEntity = new ClientQuotaEntity (Map (ClientQuotaEntity .CLIENT_ID -> defaultQuota).asJava)
150+ val userAlterations = new ClientQuotaAlteration (userEntity,
151+ Collections .singleton(new ClientQuotaAlteration .Op (" consumer_byte_rate" , 10000D )))
152+ val clientAlterations = new ClientQuotaAlteration (clientEntity,
153+ Collections .singleton(new ClientQuotaAlteration .Op (" producer_byte_rate" , 10000D )))
154+ val alterations = List (userAlterations, clientAlterations)
155+ client.alterClientQuotas(alterations.asJava).all().get()
156+
157+ TestUtils .waitUntilTrue(() => {
158+ try {
159+ // check "<default>" as a default quota use
160+ val userDefaultQuotas = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
161+ ClientQuotaFilterComponent .ofDefaultEntity(ClientQuotaEntity .USER )))).entities().get()
162+ val clientDefaultQuotas = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
163+ ClientQuotaFilterComponent .ofDefaultEntity(ClientQuotaEntity .CLIENT_ID )))).entities().get()
164+
165+ // check "<default>" as a normal quota use
166+ val userNormalQuota = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
167+ ClientQuotaFilterComponent .ofEntity(ClientQuotaEntity .USER ,defaultQuota)))).entities().get()
168+ val clientNormalQuota = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
169+ ClientQuotaFilterComponent .ofEntity(ClientQuotaEntity .CLIENT_ID ,defaultQuota)))).entities().get()
170+
171+ userDefaultQuotas.size() == 0 && clientDefaultQuotas.size() == 0 && userNormalQuota.size() == 1 && clientNormalQuota.size() == 1
172+ } catch {
173+ case _ : Exception => false
174+ }
175+ }, " Timed out waiting for quota config to be propagated to all servers" )
176+
177+ // null can create default quota
178+ val userDefaultEntity = new ClientQuotaEntity (Map (ClientQuotaEntity .USER -> Option .empty[String ].orNull).asJava)
179+ client.alterClientQuotas(List (new ClientQuotaAlteration (userDefaultEntity, Collections .singleton(
180+ new ClientQuotaAlteration .Op (" consumer_byte_rate" , 100D )))).asJava).all().get()
181+ val clientDefaultEntity = new ClientQuotaEntity (Map (ClientQuotaEntity .CLIENT_ID -> Option .empty[String ].orNull).asJava)
182+ client.alterClientQuotas(List (new ClientQuotaAlteration (clientDefaultEntity, Collections .singleton(
183+ new ClientQuotaAlteration .Op (" producer_byte_rate" , 100D )))).asJava).all().get()
184+
185+ TestUtils .waitUntilTrue(() => {
186+ try {
187+ val userDefaultQuota = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
188+ ClientQuotaFilterComponent .ofDefaultEntity(ClientQuotaEntity .USER )))).entities().get()
189+ val clientDefaultQuota = client.describeClientQuotas(ClientQuotaFilter .containsOnly(Collections .singletonList(
190+ ClientQuotaFilterComponent .ofDefaultEntity(ClientQuotaEntity .CLIENT_ID )))).entities().get()
191+ userDefaultQuota.size() == 1 && clientDefaultQuota.size() == 1
192+ } catch {
193+ case _ : Exception => false
194+ }
195+ }, " Timed out waiting for quota config to be propagated to all servers" )
196+ }
197+
141198 @ Test
142199 def testDescribeUserScramCredentials (): Unit = {
143200 client = createAdminClient
0 commit comments