Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make dataSource and serviceFactory concurrent execution #4252

Merged
merged 14 commits into from
Jan 4, 2025
4 changes: 3 additions & 1 deletion packages/axios/src/serviceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export class HttpServiceFactory extends ServiceFactory<AxiosInstance> {
},
};
}
await this.initClients(axiosConfig);
await this.initClients(axiosConfig, {
concurrent: true,
});
}

protected async createClient(
Expand Down
270 changes: 188 additions & 82 deletions packages/core/src/common/dataSourceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import { Types } from '../util/types';
import { DEFAULT_PATTERN, IGNORE_PATTERN } from '../constants';
import { debuglog } from 'util';
import { loadModule } from '../util';
import { ModuleLoadType, DataSourceManagerConfigOption } from '../interface';
import {
ModuleLoadType,
DataSourceManagerConfigOption,
IDataSourceManager,
BaseDataSourceManagerConfigOption,
} from '../interface';
import { Inject } from '../decorator';
import { MidwayEnvironmentService } from '../service/environmentService';
import { MidwayPriorityManager } from './priorityManager';
Expand All @@ -18,16 +23,26 @@ const debug = debuglog('midway:debug');

export abstract class DataSourceManager<
T,
ConnectionOpts extends Record<string, any> = Record<string, any>
> {
ConnectionOpts extends BaseDataSourceManagerConfigOption<
Record<string, any>,
ENTITY_CONFIG_KEY
> = BaseDataSourceManagerConfigOption<Record<string, any>, 'entities'>,
ENTITY_CONFIG_KEY extends string = 'entities'
> implements IDataSourceManager<T, ConnectionOpts>
{
protected dataSource: Map<string, T> = new Map();
protected options: DataSourceManagerConfigOption<ConnectionOpts> = {};
protected options: DataSourceManagerConfigOption<
ConnectionOpts,
ENTITY_CONFIG_KEY
> = {};
protected modelMapping = new WeakMap();
private innerDefaultDataSourceName: string;
protected dataSourcePriority: Record<string, string> = {};
// for multi client with initialization
private creatingDataSources = new Map<string, Promise<any>>();

@Inject()
protected appDir: string;
protected baseDir: string;

@Inject()
protected environmentService: MidwayEnvironmentService;
Expand All @@ -36,11 +51,15 @@ export abstract class DataSourceManager<
protected priorityManager: MidwayPriorityManager;

protected async initDataSource(
dataSourceConfig: DataSourceManagerConfigOption<ConnectionOpts>,
dataSourceConfig: DataSourceManagerConfigOption<
ConnectionOpts,
ENTITY_CONFIG_KEY
>,
baseDirOrOptions:
| {
baseDir: string;
baseDir?: string;
entitiesConfigKey?: string;
concurrent?: boolean;
}
| string
): Promise<void> {
Expand All @@ -55,23 +74,30 @@ export abstract class DataSourceManager<
baseDirOrOptions = {
baseDir: baseDirOrOptions,
entitiesConfigKey: 'entities',
concurrent: false,
};
}

for (const dataSourceName in dataSourceConfig.dataSource) {
const dataSourceOptions = dataSourceConfig.dataSource[dataSourceName];
const userEntities = dataSourceOptions[
baseDirOrOptions.entitiesConfigKey
] as any[];
const {
baseDir = this.baseDir,
entitiesConfigKey = 'entities',
concurrent,
} = baseDirOrOptions;

const processDataSource = async (
dataSourceName: string,
dataSourceOptions: any
) => {
const userEntities = dataSourceOptions[entitiesConfigKey] as any[];
if (userEntities) {
const entities = new Set();
// loop entities and glob files to model
for (const entity of userEntities) {

const processEntity = async (entity: any) => {
if (typeof entity === 'string') {
// string will be glob file
const models = await globModels(
const models = await DataSourceManager.globModels(
entity,
baseDirOrOptions.baseDir,
baseDir,
this.environmentService?.getModuleLoadType()
);
for (const model of models) {
Expand All @@ -83,21 +109,39 @@ export abstract class DataSourceManager<
entities.add(entity);
this.modelMapping.set(entity, dataSourceName);
}
};

if (concurrent) {
await Promise.all(userEntities.map(processEntity));
} else {
for (const entity of userEntities) {
await processEntity(entity);
}
}
(dataSourceOptions[baseDirOrOptions.entitiesConfigKey] as any) =
Array.from(entities);

dataSourceOptions[entitiesConfigKey] = Array.from(entities);
debug(
`[core]: DataManager load ${
dataSourceOptions[baseDirOrOptions.entitiesConfigKey].length
} models from ${dataSourceName}.`
`[core]: DataManager load ${dataSourceOptions[entitiesConfigKey].length} models from ${dataSourceName}.`
);
}

// create data source
const opts = {
cacheInstance: dataSourceConfig.cacheInstance, // will default true
cacheInstance: dataSourceConfig.cacheInstance,
validateConnection: dataSourceConfig.validateConnection,
};
await this.createInstance(dataSourceOptions, dataSourceName, opts);
return this.createInstance(dataSourceOptions, dataSourceName, opts);
};

const entries = Object.entries(dataSourceConfig.dataSource);
if (concurrent) {
await Promise.all(
entries.map(([name, options]) => processDataSource(name, options))
);
} else {
for (const [name, options] of entries) {
await processDataSource(name, options);
}
}
}

Expand Down Expand Up @@ -134,43 +178,102 @@ export abstract class DataSourceManager<
return inst ? this.checkConnected(inst) : false;
}

public async createInstance(config: ConnectionOpts): Promise<T | void>;
public async createInstance(
config: ConnectionOpts,
clientName: string,
options?: {
/**
* @deprecated
*/
validateConnection?: boolean;
/**
* @deprecated
*/
cacheInstance?: boolean | undefined;
}
): Promise<T | void>;
public async createInstance(
config: any,
clientName: any,
config: ConnectionOpts,
clientName?: any,
options?: {
validateConnection?: boolean;
cacheInstance?: boolean | undefined;
}
): Promise<T | void> {
const cache =
options && typeof options.cacheInstance === 'boolean'
? options.cacheInstance
: true;
const validateConnection = (options && options.validateConnection) || false;
if (clientName && typeof clientName !== 'string') {
options = clientName;
clientName = undefined;
}

// options.clients[id] will be merged with options.default
const configNow = extend(true, {}, this.options['default'], config);
const client = await this.createDataSource(configNow, clientName);
if (cache && clientName && client) {
this.dataSource.set(clientName, client);
if (clientName && options && options.cacheInstance === false) {
// 后面就用传不传 clientName 来判断是否缓存
clientName = undefined;
}

if (validateConnection) {
if (!client) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} initialization failed.`
);
if (clientName) {
if (this.dataSource.has(clientName)) {
return this.dataSource.get(clientName);
}

if (this.creatingDataSources.has(clientName)) {
return this.creatingDataSources.get(clientName);
}
}

const validateConnection =
config.validateConnection ??
(options && options.validateConnection) ??
false;

const connected = await this.checkConnected(client);
if (!connected) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} is not connected.`
// options.clients[id] will be merged with options.default
const configNow = extend(true, {}, this.options['default'], config);

const clientCreatingPromise = this.createDataSource(configNow, clientName);

if (clientCreatingPromise && Types.isPromise(clientCreatingPromise)) {
if (clientName) {
this.creatingDataSources.set(
clientName,
clientCreatingPromise as Promise<T>
);
}

return (clientCreatingPromise as Promise<T>)
.then(async client => {
if (clientName) {
this.dataSource.set(clientName, client);
}

if (validateConnection) {
if (!client) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} initialization failed.`
);
}

const connected = await this.checkConnected(client);
if (!connected) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} is not connected.`
);
}
}

return client;
})
.finally(() => {
if (clientName) {
this.creatingDataSources.delete(clientName);
}
});
}

return client;
// 处理同步返回的情况
if (clientName) {
this.dataSource.set(clientName, clientCreatingPromise as T);
}
return clientCreatingPromise;
}

/**
Expand Down Expand Up @@ -232,50 +335,53 @@ export abstract class DataSourceManager<
public isLowPriority(name: string) {
return this.priorityManager.isLowPriority(this.dataSourcePriority[name]);
}
}

export function formatGlobString(globString: string): string[] {
let pattern;
static formatGlobString(globString: string): string[] {
let pattern;

if (!/^\*/.test(globString)) {
globString = '/' + globString;
}
const parsePattern = parse(globString);
if (parsePattern.base && (/\*/.test(parsePattern.base) || parsePattern.ext)) {
pattern = [globString];
} else {
pattern = [...DEFAULT_PATTERN.map(p => join(globString, p))];
if (!/^\*/.test(globString)) {
globString = '/' + globString;
}
const parsePattern = parse(globString);
if (
parsePattern.base &&
(/\*/.test(parsePattern.base) || parsePattern.ext)
) {
pattern = [globString];
} else {
pattern = [...DEFAULT_PATTERN.map(p => join(globString, p))];
}
return pattern;
}
return pattern;
}

export async function globModels(
globString: string,
appDir: string,
loadMode?: ModuleLoadType
) {
const pattern = formatGlobString(globString);

const models = [];
// string will be glob file
const files = run(pattern, {
cwd: appDir,
ignore: IGNORE_PATTERN,
});
for (const file of files) {
const exports = await loadModule(file, {
loadMode,
static async globModels(
globString: string,
baseDir: string,
loadMode?: ModuleLoadType
) {
const pattern = this.formatGlobString(globString);

const models = [];
// string will be glob file
const files = run(pattern, {
cwd: baseDir,
ignore: IGNORE_PATTERN,
});
if (Types.isClass(exports)) {
models.push(exports);
} else {
for (const m in exports) {
const module = exports[m];
if (Types.isClass(module)) {
models.push(module);
for (const file of files) {
const exports = await loadModule(file, {
loadMode,
});
if (Types.isClass(exports)) {
models.push(exports);
} else {
for (const m in exports) {
const module = exports[m];
if (Types.isClass(module)) {
models.push(module);
}
}
}
}
return models;
}
return models;
}
Loading
Loading