Skip to content

Commit

Permalink
core rebuild and JDK version check
Browse files Browse the repository at this point in the history
  • Loading branch information
EachannChan committed Nov 15, 2024
1 parent fe40c63 commit f449863
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,14 @@ public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
ThreadPoolStats poolStats = convertCommon(executor);
poolStats.setPoolName(wrapper.getThreadPoolName());
poolStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
poolStats.setDynamic(executor instanceof DtpExecutor);

if (!wrapper.isVirtualThreadExecutor()) {
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
}

poolStats.setDynamic(executor instanceof DtpExecutor);
poolStats.setTps(performanceSnapshot.getTps());
poolStats.setAvg(performanceSnapshot.getAvg());
poolStats.setMaxRt(performanceSnapshot.getMaxRt());
Expand All @@ -83,32 +86,6 @@ public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
return poolStats;
}

public static VTExecutorStats toVTExecutorMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
VTExecutorStats executorStats = convertCommonVT(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setDynamic(executor instanceof DtpExecutor);

executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
poolStats.setCorePoolSize(executor.getCorePoolSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ private void collectMetrics(Set<String> executorNames) {
}
executorNames.forEach(x -> {
ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(x);
if (wrapper.isVirtualThreadExecutor()) {
doCollectVTExecutor(ExecutorConverter.toVTExecutorMetrics(wrapper));
} else {
doCollect(ExecutorConverter.toMetrics(wrapper));
}
doCollect(ExecutorConverter.toMetrics(wrapper));
});
publishCollectEvent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
*
* @return 虚拟线程执行器实例
*/
@DynamicTp("VirtualThreadExecutor")
@DynamicTp("virtualThreadExecutor")
@Bean
public ExecutorService virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public TestServiceImpl(ThreadPoolExecutor jucThreadPoolExecutor,
DtpExecutor eagerDtpExecutor,
ScheduledExecutorService scheduledDtpExecutor,
OrderedDtpExecutor orderedDtpExecutor,
@Qualifier("virtualThreadExecutor1") ExecutorService virtualThreadExecutor) {
@Qualifier("virtualThreadExecutor") ExecutorService virtualThreadExecutor) {
this.jucThreadPoolExecutor = jucThreadPoolExecutor;
this.threadPoolTaskExecutor = threadPoolTaskExecutor;
this.eagerDtpExecutor = eagerDtpExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr
/**
* Compatible with lower versions of Spring.
*
* @param bean the new bean instance
* @param bean the new bean instance
* @param beanName the name of the bean
* @return the bean instance to use
* @throws BeansException in case of errors
Expand Down Expand Up @@ -163,9 +163,10 @@ private Object doRegisterAndReturnCommon(Object bean, String poolName) {
Executor proxy;
if (bean instanceof ScheduledThreadPoolExecutor) {
proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean);
} else if (bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")
|| bean instanceof VirtualThreadExecutorProxy) {
} else if (bean.getClass().getName().equals("java.util.concurrent.ThreadPerTaskExecutor")) {
proxy = newVirtualThreadProxy(poolName, (ExecutorService) bean);
} else if (bean instanceof VirtualThreadExecutorProxy) {
proxy = (Executor) bean;
} else {
proxy = newProxy(poolName, (ThreadPoolExecutor) bean);
}
Expand Down Expand Up @@ -196,8 +197,7 @@ private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, Schedu
}

private VirtualThreadExecutorProxy newVirtualThreadProxy(String name, ExecutorService originExecutor) {
val proxy = new VirtualThreadExecutorProxy(originExecutor);
return proxy;
return new VirtualThreadExecutorProxy(originExecutor);
}

private void tryWrapTaskDecorator(String poolName, ThreadPoolTaskExecutor poolTaskExecutor, ThreadPoolExecutorProxy proxy) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.common.util.VersionUtil;
import org.dromara.dynamictp.core.executor.ExecutorType;
import org.dromara.dynamictp.core.executor.NamedThreadFactory;
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
Expand All @@ -44,7 +45,23 @@
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

import static org.dromara.dynamictp.common.constant.DynamicTpConst.*;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.ALLOW_CORE_THREAD_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWAIT_TERMINATION_SECONDS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.AWARE_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ENABLED;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.NOTIFY_ITEMS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLATFORM_IDS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PLUGIN_NAMES;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PRE_START_ALL_CORE_THREADS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.QUEUE_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_ENHANCED;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.REJECT_HANDLER_TYPE;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.RUN_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TASK_WRAPPERS;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_ALIAS_NAME;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.THREAD_POOL_NAME;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.TRY_INTERRUPT_WHEN_TIMEOUT;
import static org.dromara.dynamictp.common.constant.DynamicTpConst.WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN;
import static org.dromara.dynamictp.common.em.QueueTypeEnum.buildLbq;
import static org.dromara.dynamictp.common.entity.NotifyItem.mergeAllNotifyItems;

Expand All @@ -57,6 +74,8 @@
@Slf4j
public class DtpBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private static final String JDK_VERSION_21 = "21";

private Environment environment;

@Override
Expand All @@ -80,45 +99,34 @@ public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, B
}
Class<?> executorTypeClass = ExecutorType.getClass(e.getExecutorType());
Map<String, Object> propertyValues;
if (executorTypeClass.equals(VirtualThreadExecutorProxy.class)) {
propertyValues = buildVTPropertyValues(e);
} else {
propertyValues = buildPropertyValues(e);
propertyValues = buildPropertyValues(e);
Object[] args;
try {
args = buildConstructorArgs(executorTypeClass, e);
} catch (UnsupportedOperationException exception) {
return;
}
Object[] args = buildConstructorArgs(executorTypeClass, e);
BeanRegistrationUtil.register(registry, e.getThreadPoolName(), executorTypeClass, propertyValues, args);
});
}

private Map<String, Object> buildVTPropertyValues(DtpExecutorProps props) {
private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
Map<String, Object> propertyValues = Maps.newHashMap();
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName());
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName());
val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
propertyValues.put(PLATFORM_IDS, props.getPlatformIds());
propertyValues.put(NOTIFY_ENABLED, props.isNotifyEnabled());

val taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
propertyValues.put(TASK_WRAPPERS, taskWrappers);
propertyValues.put(PLUGIN_NAMES, props.getPluginNames());
propertyValues.put(AWARE_NAMES, props.getAwareNames());
return propertyValues;
}
if (!ExecutorType.getClass(props.getExecutorType()).equals(VirtualThreadExecutorProxy.class)) {
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
}

private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
Map<String, Object> propertyValues = Maps.newHashMap();
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName());
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName());
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut());
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown());
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds());
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads());
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType());
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced());
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout());
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt());
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout());
val notifyItems = mergeAllNotifyItems(props.getNotifyItems());
propertyValues.put(NOTIFY_ITEMS, notifyItems);
propertyValues.put(PLATFORM_IDS, props.getPlatformIds());
Expand All @@ -131,13 +139,17 @@ private Map<String, Object> buildPropertyValues(DtpExecutorProps props) {
return propertyValues;
}

private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) {
private Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) throws UnsupportedOperationException {
BlockingQueue<Runnable> taskQueue;
if (clazz.equals(EagerDtpExecutor.class)) {
taskQueue = new TaskQueue(props.getQueueCapacity());
} else if (clazz.equals(PriorityDtpExecutor.class)) {
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator());
} else if (clazz.equals(VirtualThreadExecutorProxy.class)) {
if (!VersionUtil.getVersion().equals(JDK_VERSION_21)) {
log.warn("DynamicTp virtual thread executor {} register warn: update your JDK version or don't use virtual thread executor!", props.getThreadPoolName());
throw new UnsupportedOperationException();
}
return new Object[]{
Executors.newVirtualThreadPerTaskExecutor()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ public List<Metrics> invoke() {
List<Metrics> metricsList = Lists.newArrayList();
DtpRegistry.getAllExecutorNames().forEach(x -> {
ExecutorWrapper wrapper = DtpRegistry.getExecutorWrapper(x);
if (wrapper.isVirtualThreadExecutor()) {
metricsList.add(ExecutorConverter.toVTExecutorMetrics(wrapper));
} else {
metricsList.add(ExecutorConverter.toMetrics(wrapper));
}
metricsList.add(ExecutorConverter.toMetrics(wrapper));
});

val handlerMap = ContextManagerHelper.getBeansOfType(MetricsAware.class);
Expand Down

0 comments on commit f449863

Please sign in to comment.