Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Partitions Per Pod Scaling #2581

Open
gabrieljones opened this issue Jan 29, 2022 · 22 comments
Open

Kafka Partitions Per Pod Scaling #2581

gabrieljones opened this issue Jan 29, 2022 · 22 comments
Assignees
Labels
feature-request All issues for new features that have not been committed to needs-discussion stale-bot-ignore All issues that should not be automatically closed by our stale bot

Comments

@gabrieljones
Copy link

Proposal

Enhance the Kafka Scaler or create a new KEDA Scaler to support scaling Kafka by partitions per pod as opposed to simple increase in pod count.

Use-Case

Say I have 12 partitions for my topic.
At 3 pods I will have four partitions per pod:
4 + 4 + 4 = 12
At 4 pods I will have three partitions per pod:
3 + 3 + 3 + 3 = 12
If I naively scale to 5 pods, I will end up with 3 pods with 2 partitions each and 2 pods with still 3 partitions each:
2 + 2 + 2 + 3 + 3 = 12
To scale correctly I need to jump from 4 pods directly to 6 pods:
2 + 2 + 2 + 2 + 2 + 2 = 12
And instead of scaling to 7, 8, 9, 10, or 11, it should scale directly from 6 to 12.
1 + 1 + 1 + 1 + 1 + 1 + 1 + 1 + 1 + 1 + 1 + 1 = 12

As such the list of valid scale targets is not a simple sequence from 0 to 12. It is rather a sequence of the divisors of the partition count:
0, 1, 2, 3, 4, 6, 12

Anything else?

No response

@gabrieljones gabrieljones added feature-request All issues for new features that have not been committed to needs-discussion labels Jan 29, 2022
@gabrieljones
Copy link
Author

Here is a list of pod scaling sequences based on partition count:

  4: 1, 2, 4
  6: 1, 2, 3, 6
  8: 1, 2, 4, 8
 10: 1, 2, 5, 10
 12: 1, 2, 3, 4, 6, 12
 18: 1, 2, 3, 6, 9, 18
 20: 1, 2, 4, 5, 10, 20
 24: 1, 2, 3, 4, 6, 8, 12, 24
 30: 1, 2, 3, 5, 6, 10, 15, 30
 36: 1, 2, 3, 4, 6, 9, 12, 18, 36
 48: 1, 2, 3, 4, 6, 8, 12, 16, 24, 48
 60: 1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30, 60
 72: 1, 2, 3, 4, 6, 8, 9, 12, 18, 24, 36, 72
 84: 1, 2, 3, 4, 6, 7, 12, 14, 21, 28, 42, 84
 90: 1, 2, 3, 5, 6, 9, 10, 15, 18, 30, 45, 90
 96: 1, 2, 3, 4, 6, 8, 12, 16, 24, 32, 48, 96
108: 1, 2, 3, 4, 6, 9, 12, 18, 27, 36, 54, 108
120: 1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 20, 24, 30, 40, 60, 120
168: 1, 2, 3, 4, 6, 7, 8, 12, 14, 21, 24, 28, 42, 56, 84, 168
180: 1, 2, 3, 4, 5, 6, 9, 10, 12, 15, 18, 20, 30, 36, 45, 60, 90, 180
240: 1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 16, 20, 24, 30, 40, 48, 60, 80, 120, 240
336: 1, 2, 3, 4, 6, 7, 8, 12, 14, 16, 21, 24, 28, 42, 48, 56, 84, 112, 168, 336
360: 1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 15, 18, 20, 24, 30, 36, 40, 45, 60, 72, 90, 120, 180, 360

Here is the Scala code that generated the above list:

def divs(x: Int): Seq[Int] = (1 to x).filter(x % _ == 0)

val res = (1 to 360).map(x => x -> divs(x))

var c = 0

val resF = for {
  i <- 4 to 360
  ds = divs(i)
  if ds.size >= c
  _ = c = ds.size
} yield i -> ds

for {
  (i, ds) <- resF
} {
  println(f"$i%3d: ${ds.mkString(", ")}")
}

@rwkarg
Copy link
Contributor

rwkarg commented Feb 2, 2022

This would be great for ensuring that only impactful scaling happens with Kafka consumers, especially since all values between p/2 and p-1 are essentially noop for scaling (they all have at least one consumer with 2 partitions).

@rwkarg
Copy link
Contributor

rwkarg commented Feb 2, 2022

the Kafka scaler already fakes the metrics to cap the instance count at the partition count.

if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of topicPartitions
if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}

Could similarly fake the metric value to result in the desired instance count. Not saying if that's a good idea or not, but I think it would work.

@zroubalik
Copy link
Member

zroubalik commented Feb 2, 2022

That approach just caps the max reported metrics, not replicas. But yeah, you are right, it could be done somehow, but it would need to mimic HPA algorithm and report fake metrics, that's not ideal imho.

But happy to see some POC :)

@rwkarg
Copy link
Contributor

rwkarg commented Feb 2, 2022

It's essentially totalLag = desiredInstances * s.metadata.lagThreshold

The existing code do a max where desiredInstances == the partition count, but if we figure out that the break points are for the topic's partition count (following logic similar to @gabrieljones above), then we can compute with the naïve instance count would be (totalLag / s.metadata.lagThreshold), slide up to the next highest break point to get the desired instances, and the compute the fake totalLag = desiredInstances * s.metadata.lagThreshold

@zroubalik
Copy link
Member

In theory this could work, I am curious how precise the calculation will be and if we are able to really convince the HPA to scale accordingly.

@rwkarg keen to give it a try?

@tomkerkhove tomkerkhove moved this to Backlog in Roadmap - KEDA Core Feb 10, 2022
@tomkerkhove tomkerkhove moved this from To Do to Proposed in Roadmap - KEDA Core Feb 14, 2022
@stale
Copy link

stale bot commented Apr 4, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label Apr 4, 2022
@rwkarg
Copy link
Contributor

rwkarg commented Apr 7, 2022

Haven't had any time to look at this, but it would be a nice improvement.

@stale stale bot removed the stale All issues that are marked as stale due to inactivity label Apr 7, 2022
@stale
Copy link

stale bot commented Jun 6, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label Jun 6, 2022
@stale
Copy link

stale bot commented Jun 13, 2022

This issue has been automatically closed due to inactivity.

@stale stale bot closed this as completed Jun 13, 2022
Repository owner moved this from Proposed to Ready To Ship in Roadmap - KEDA Core Jun 13, 2022
@zroubalik zroubalik reopened this Jun 14, 2022
Repository owner moved this from Ready To Ship to Proposed in Roadmap - KEDA Core Jun 14, 2022
@stale stale bot removed the stale All issues that are marked as stale due to inactivity label Jun 14, 2022
@zroubalik zroubalik added the stale-bot-ignore All issues that should not be automatically closed by our stale bot label Jun 14, 2022
@mwarkentin
Copy link

Generally this has been how we've scaled our Kafka consumers when managing replica count manually, so some way to do so with autoscaling would be great as well to help guarantee that we have an even distribution of work (assuming relatively even distribution of messages across partitions) for our consumers.

@zroubalik
Copy link
Member

@mwarkentin are you willing to contribute this? :)

@stale
Copy link

stale bot commented Nov 30, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label Nov 30, 2022
@mwarkentin
Copy link

Looks like stalebot is ignoring the stale-bot-ignore label?

@stale stale bot removed the stale All issues that are marked as stale due to inactivity label Nov 30, 2022
@gabrieljones
Copy link
Author

I need to revise my sequence algorithm. It is only considering scale steps where all pods get the same number of partitions.

For example at 8 total partitions scaling from 2 to 3 is impactful as you go from 4,4 to 3,3,2.

@zroubalik
Copy link
Member

@gabrieljones ack, are you willing to contribute this once you are okay with the algorithm?

@patelvp
Copy link

patelvp commented Nov 14, 2024

@zroubalik, I would like to take it on. Would be a nice starter project for me.

@zroubalik
Copy link
Member

@patelvp great!

@patelvp
Copy link

patelvp commented Nov 18, 2024

@zroubalik I have most of the pieces here #6343.
I will complete the PR description and tests once I have direction. The only question I have is that I would need to know the current number of replicas for the HPA here. This would essentially help determine what the next step should be. Not sure if there is a clean way to know the HPA replica count in the scaler code. The way I understand the architecture is that the scalers only provide metrics to the HPA to scale.

@gabrieljones
Copy link
Author

gabrieljones commented Nov 20, 2024

Not sure if this helps. I have been working on a multi k8s cluster aware direct autoscaler. It does not use metrics and it bypasses the hpa logic altogether. It uses the hpa merely to adjust the minReplicas. It directly adjusts the hpa.spec.minReplicas on the hpas for the given deployment in each cluster via io.fabric8.kubernetes.client.KubernetesClient. It evenly distributes the desired consumer count across the multiple k8s clusters. It talks directly to the apiservers to get and set the current minReplicas on each hpa. It also talks directly to the kafka cluster via org.apache.kafka.clients.admin.AdminClient to get the latest offsets and consumer group offsets. It also gets the current consumer group membership and compares that with the minReplicas set on the hpa.

@patelvp
Copy link

patelvp commented Nov 21, 2024

Not sure if this helps. I have been working on a multi k8s cluster aware direct autoscaler. It does not use metrics and it bypasses the hpa logic altogether. It uses the hpa merely to adjust the minReplicas. It directly adjusts the hpa.spec.minReplicas on the hpas for the given deployment in each cluster via io.fabric8.kubernetes.client.KubernetesClient. It evenly distributes the desired consumer count across the multiple k8s clusters. It talks directly to the apiservers to get and set the current minReplicas on each hpa. It also talks directly to the kafka cluster via org.apache.kafka.clients.admin.AdminClient to get the latest offsets and consumer group offsets. It also gets the current consumer group membership and compares that with the minReplicas set on the hpa.

@gabrieljones That is a great idea that could work. I can extend that idea to just pull the current replica count. Because that is what I want to make this work more natively.
I just want to make sure that would be an acceptable solution before investing time in working on it. I'm not sure what the maintainers think about this approach.

@zroubalik
Copy link
Member

TBH I am not completely sure we should pull the actual replica count - there might be multiple scalers defined so the final replica count might be affected by another scaler. We should imho do the calculation similar way like the fallback feature.

@gabrieljones this is interesting, is the scaler public? Can we check it out?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-request All issues for new features that have not been committed to needs-discussion stale-bot-ignore All issues that should not be automatically closed by our stale bot
Projects
Status: Proposed
Development

No branches or pull requests

5 participants