|
26 | 26 | import java.lang.management.ManagementFactory; |
27 | 27 | import java.lang.management.ThreadInfo; |
28 | 28 | import java.lang.management.ThreadMXBean; |
29 | | -import java.time.Duration; |
30 | | -import java.util.ArrayList; |
31 | 29 | import java.util.Arrays; |
32 | 30 | import java.util.Collections; |
33 | 31 | import java.util.List; |
34 | 32 | import java.util.Map; |
35 | | -import java.util.Objects; |
36 | | -import java.util.UUID; |
37 | 33 | import java.util.concurrent.CompletableFuture; |
38 | | -import java.util.concurrent.TimeoutException; |
39 | 34 | import java.util.stream.Collectors; |
40 | 35 | import javax.ws.rs.DELETE; |
41 | 36 | import javax.ws.rs.DefaultValue; |
|
50 | 45 | import javax.ws.rs.core.Response.Status; |
51 | 46 | import org.apache.commons.lang.StringUtils; |
52 | 47 | import org.apache.pulsar.PulsarVersion; |
53 | | -import org.apache.pulsar.broker.PulsarServerException; |
54 | | -import org.apache.pulsar.broker.PulsarService; |
55 | 48 | import org.apache.pulsar.broker.PulsarService.State; |
56 | 49 | import org.apache.pulsar.broker.ServiceConfiguration; |
57 | 50 | import org.apache.pulsar.broker.admin.AdminResource; |
58 | 51 | import org.apache.pulsar.broker.loadbalance.LeaderBroker; |
59 | | -import org.apache.pulsar.broker.namespace.NamespaceService; |
60 | | -import org.apache.pulsar.broker.service.Subscription; |
61 | | -import org.apache.pulsar.broker.service.Topic; |
62 | 52 | import org.apache.pulsar.broker.web.RestException; |
63 | | -import org.apache.pulsar.client.api.MessageId; |
64 | | -import org.apache.pulsar.client.api.Producer; |
65 | | -import org.apache.pulsar.client.api.PulsarClient; |
66 | | -import org.apache.pulsar.client.api.Reader; |
67 | | -import org.apache.pulsar.client.api.Schema; |
68 | 53 | import org.apache.pulsar.common.conf.InternalConfigurationData; |
69 | | -import org.apache.pulsar.common.naming.NamespaceName; |
70 | 54 | import org.apache.pulsar.common.naming.TopicVersion; |
71 | 55 | import org.apache.pulsar.common.policies.data.BrokerInfo; |
72 | 56 | import org.apache.pulsar.common.policies.data.BrokerOperation; |
|
81 | 65 | */ |
82 | 66 | public class BrokersBase extends AdminResource { |
83 | 67 | private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); |
84 | | - public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck"; |
85 | 68 | // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes |
86 | 69 | // to prevent excessive logging |
87 | 70 | private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L; |
88 | | - // there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout |
89 | | - // a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception. |
90 | | - // or we can't propagate the server timeout exception to the client. |
91 | | - private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58); |
92 | | - private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION = |
93 | | - FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)"); |
94 | 71 | private static volatile long threadDumpLoggedTimestamp; |
95 | 72 |
|
96 | 73 | @GET |
@@ -385,16 +362,21 @@ public void isReady(@Suspended AsyncResponse asyncResponse) { |
385 | 362 | @ApiResponse(code = 307, message = "Current broker is not the target broker"), |
386 | 363 | @ApiResponse(code = 403, message = "Don't have admin permission"), |
387 | 364 | @ApiResponse(code = 404, message = "Cluster doesn't exist"), |
388 | | - @ApiResponse(code = 500, message = "Internal server error")}) |
| 365 | + @ApiResponse(code = 500, message = "Internal server error"), |
| 366 | + @ApiResponse(code = 503, message = "Service unavailable")}) |
389 | 367 | public void healthCheck(@Suspended AsyncResponse asyncResponse, |
390 | 368 | @ApiParam(value = "Topic Version") |
391 | 369 | @QueryParam("topicVersion") TopicVersion topicVersion, |
392 | 370 | @QueryParam("brokerId") String brokerId) { |
| 371 | + if (pulsar().getState() == State.Closed || pulsar().getState() == State.Closing) { |
| 372 | + asyncResponse.resume(Response.status(Status.SERVICE_UNAVAILABLE).build()); |
| 373 | + return; |
| 374 | + } |
393 | 375 | validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId) |
394 | 376 | ? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK) |
395 | | - .thenAccept(__ -> checkDeadlockedThreads()) |
396 | 377 | .thenCompose(__ -> maybeRedirectToBroker( |
397 | 378 | StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId)) |
| 379 | + .thenAccept(__ -> checkDeadlockedThreads()) |
398 | 380 | .thenCompose(__ -> internalRunHealthCheck(topicVersion)) |
399 | 381 | .thenAccept(__ -> { |
400 | 382 | LOG.info("[{}] Successfully run health check.", clientAppId()); |
@@ -432,143 +414,8 @@ private void checkDeadlockedThreads() { |
432 | 414 | } |
433 | 415 | } |
434 | 416 |
|
435 | | - public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) { |
436 | | - NamespaceName namespaceName = isV2 |
437 | | - ? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration) |
438 | | - : NamespaceService.getHeartbeatNamespace(brokerId, configuration); |
439 | | - return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); |
440 | | - } |
441 | | - |
442 | 417 | private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) { |
443 | | - return internalRunHealthCheck(topicVersion, pulsar(), clientAppId()); |
444 | | - } |
445 | | - |
446 | | - |
447 | | - public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar, |
448 | | - String clientAppId) { |
449 | | - NamespaceName namespaceName = (topicVersion == TopicVersion.V2) |
450 | | - ? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()) |
451 | | - : NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); |
452 | | - String brokerId = pulsar.getBrokerId(); |
453 | | - final String topicName = |
454 | | - getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2)); |
455 | | - LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName); |
456 | | - final String messageStr = UUID.randomUUID().toString(); |
457 | | - final String subscriptionName = "healthCheck-" + messageStr; |
458 | | - // create non-partitioned topic manually and close the previous reader if present. |
459 | | - return pulsar.getBrokerService().getTopic(topicName, true) |
460 | | - .thenCompose(topicOptional -> { |
461 | | - if (!topicOptional.isPresent()) { |
462 | | - LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", |
463 | | - clientAppId, topicName); |
464 | | - throw new RestException(Status.NOT_FOUND, |
465 | | - String.format("Topic [%s] not found after create.", topicName)); |
466 | | - } |
467 | | - PulsarClient client; |
468 | | - try { |
469 | | - client = pulsar.getClient(); |
470 | | - } catch (PulsarServerException e) { |
471 | | - LOG.error("[{}] Fail to run health check while get client.", clientAppId); |
472 | | - throw new RestException(e); |
473 | | - } |
474 | | - CompletableFuture<Void> resultFuture = new CompletableFuture<>(); |
475 | | - client.newProducer(Schema.STRING).topic(topicName).createAsync() |
476 | | - .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName) |
477 | | - .subscriptionName(subscriptionName) |
478 | | - .startMessageId(MessageId.latest) |
479 | | - .createAsync().exceptionally(createException -> { |
480 | | - producer.closeAsync().exceptionally(ex -> { |
481 | | - LOG.error("[{}] Close producer fail while heath check.", clientAppId); |
482 | | - return null; |
483 | | - }); |
484 | | - throw FutureUtil.wrapToCompletionException(createException); |
485 | | - }).thenCompose(reader -> producer.sendAsync(messageStr) |
486 | | - .thenCompose(__ -> FutureUtil.addTimeoutHandling( |
487 | | - healthCheckRecursiveReadNext(reader, messageStr), |
488 | | - HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(), |
489 | | - () -> HEALTH_CHECK_TIMEOUT_EXCEPTION)) |
490 | | - .whenComplete((__, ex) -> { |
491 | | - closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName, |
492 | | - clientAppId) |
493 | | - .whenComplete((unused, innerEx) -> { |
494 | | - if (ex != null) { |
495 | | - resultFuture.completeExceptionally(ex); |
496 | | - } else { |
497 | | - resultFuture.complete(null); |
498 | | - } |
499 | | - }); |
500 | | - } |
501 | | - )) |
502 | | - ).exceptionally(ex -> { |
503 | | - resultFuture.completeExceptionally(ex); |
504 | | - return null; |
505 | | - }); |
506 | | - return resultFuture; |
507 | | - }); |
508 | | - } |
509 | | - |
510 | | - private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, |
511 | | - Topic topic, String subscriptionName) { |
512 | | - return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId()); |
513 | | - } |
514 | | - |
515 | | - /** |
516 | | - * Close producer and reader and then to re-check if this operation is success. |
517 | | - * |
518 | | - * Re-check |
519 | | - * - Producer: If close fails we will print error log to notify user. |
520 | | - * - Consumer: If close fails we will force delete subscription. |
521 | | - * |
522 | | - * @param producer Producer |
523 | | - * @param reader Reader |
524 | | - * @param topic Topic |
525 | | - * @param subscriptionName Subscription name |
526 | | - */ |
527 | | - private static CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader, |
528 | | - Topic topic, String subscriptionName, String clientAppId) { |
529 | | - // no matter exception or success, we still need to |
530 | | - // close producer/reader |
531 | | - CompletableFuture<Void> producerFuture = producer.closeAsync(); |
532 | | - CompletableFuture<Void> readerFuture = reader.closeAsync(); |
533 | | - List<CompletableFuture<Void>> futures = new ArrayList<>(2); |
534 | | - futures.add(producerFuture); |
535 | | - futures.add(readerFuture); |
536 | | - return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) |
537 | | - .exceptionally(closeException -> { |
538 | | - if (readerFuture.isCompletedExceptionally()) { |
539 | | - LOG.error("[{}] Close reader fail while heath check.", clientAppId); |
540 | | - Subscription subscription = |
541 | | - topic.getSubscription(subscriptionName); |
542 | | - // re-check subscription after reader close |
543 | | - if (subscription != null) { |
544 | | - LOG.warn("[{}] Force delete subscription {} " |
545 | | - + "when it still exists after the" |
546 | | - + " reader is closed.", |
547 | | - clientAppId, subscription); |
548 | | - subscription.deleteForcefully() |
549 | | - .exceptionally(ex -> { |
550 | | - LOG.error("[{}] Force delete subscription fail" |
551 | | - + " while health check", |
552 | | - clientAppId, ex); |
553 | | - return null; |
554 | | - }); |
555 | | - } |
556 | | - } else { |
557 | | - // producer future fail. |
558 | | - LOG.error("[{}] Close producer fail while heath check.", clientAppId); |
559 | | - } |
560 | | - return null; |
561 | | - }); |
562 | | - } |
563 | | - |
564 | | - private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) { |
565 | | - return reader.readNextAsync() |
566 | | - .thenCompose(msg -> { |
567 | | - if (!Objects.equals(content, msg.getValue())) { |
568 | | - return healthCheckRecursiveReadNext(reader, content); |
569 | | - } |
570 | | - return CompletableFuture.completedFuture(null); |
571 | | - }); |
| 418 | + return pulsar().runHealthCheck(topicVersion, clientAppId()); |
572 | 419 | } |
573 | 420 |
|
574 | 421 | private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { |
|
0 commit comments