Skip to content

Commit

Permalink
feat: make dataSource and serviceFactory concurrent execution (#4252)
Browse files Browse the repository at this point in the history
* feat: make dataSource and serviceFactory concurrent execution

* fix: lint

* fix: lint

* fix: support different mode

* fix: test

* fix: test

* fix: test

* fix: test

* docs: update

* fix: concurrent create

* docs: add more docs

* docs: add more docs

* docs: add more docs

* fix: test
  • Loading branch information
czy88840616 authored Jan 4, 2025
1 parent 856d838 commit d89cf28
Show file tree
Hide file tree
Showing 31 changed files with 1,505 additions and 218 deletions.
4 changes: 3 additions & 1 deletion packages/axios/src/serviceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export class HttpServiceFactory extends ServiceFactory<AxiosInstance> {
},
};
}
await this.initClients(axiosConfig);
await this.initClients(axiosConfig, {
concurrent: true,
});
}

protected async createClient(
Expand Down
270 changes: 188 additions & 82 deletions packages/core/src/common/dataSourceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import { Types } from '../util/types';
import { DEFAULT_PATTERN, IGNORE_PATTERN } from '../constants';
import { debuglog } from 'util';
import { loadModule } from '../util';
import { ModuleLoadType, DataSourceManagerConfigOption } from '../interface';
import {
ModuleLoadType,
DataSourceManagerConfigOption,
IDataSourceManager,
BaseDataSourceManagerConfigOption,
} from '../interface';
import { Inject } from '../decorator';
import { MidwayEnvironmentService } from '../service/environmentService';
import { MidwayPriorityManager } from './priorityManager';
Expand All @@ -18,16 +23,26 @@ const debug = debuglog('midway:debug');

export abstract class DataSourceManager<
T,
ConnectionOpts extends Record<string, any> = Record<string, any>
> {
ConnectionOpts extends BaseDataSourceManagerConfigOption<
Record<string, any>,
ENTITY_CONFIG_KEY
> = BaseDataSourceManagerConfigOption<Record<string, any>, 'entities'>,
ENTITY_CONFIG_KEY extends string = 'entities'
> implements IDataSourceManager<T, ConnectionOpts>
{
protected dataSource: Map<string, T> = new Map();
protected options: DataSourceManagerConfigOption<ConnectionOpts> = {};
protected options: DataSourceManagerConfigOption<
ConnectionOpts,
ENTITY_CONFIG_KEY
> = {};
protected modelMapping = new WeakMap();
private innerDefaultDataSourceName: string;
protected dataSourcePriority: Record<string, string> = {};
// for multi client with initialization
private creatingDataSources = new Map<string, Promise<any>>();

@Inject()
protected appDir: string;
protected baseDir: string;

@Inject()
protected environmentService: MidwayEnvironmentService;
Expand All @@ -36,11 +51,15 @@ export abstract class DataSourceManager<
protected priorityManager: MidwayPriorityManager;

protected async initDataSource(
dataSourceConfig: DataSourceManagerConfigOption<ConnectionOpts>,
dataSourceConfig: DataSourceManagerConfigOption<
ConnectionOpts,
ENTITY_CONFIG_KEY
>,
baseDirOrOptions:
| {
baseDir: string;
baseDir?: string;
entitiesConfigKey?: string;
concurrent?: boolean;
}
| string
): Promise<void> {
Expand All @@ -55,23 +74,30 @@ export abstract class DataSourceManager<
baseDirOrOptions = {
baseDir: baseDirOrOptions,
entitiesConfigKey: 'entities',
concurrent: false,
};
}

for (const dataSourceName in dataSourceConfig.dataSource) {
const dataSourceOptions = dataSourceConfig.dataSource[dataSourceName];
const userEntities = dataSourceOptions[
baseDirOrOptions.entitiesConfigKey
] as any[];
const {
baseDir = this.baseDir,
entitiesConfigKey = 'entities',
concurrent,
} = baseDirOrOptions;

const processDataSource = async (
dataSourceName: string,
dataSourceOptions: any
) => {
const userEntities = dataSourceOptions[entitiesConfigKey] as any[];
if (userEntities) {
const entities = new Set();
// loop entities and glob files to model
for (const entity of userEntities) {

const processEntity = async (entity: any) => {
if (typeof entity === 'string') {
// string will be glob file
const models = await globModels(
const models = await DataSourceManager.globModels(
entity,
baseDirOrOptions.baseDir,
baseDir,
this.environmentService?.getModuleLoadType()
);
for (const model of models) {
Expand All @@ -83,21 +109,39 @@ export abstract class DataSourceManager<
entities.add(entity);
this.modelMapping.set(entity, dataSourceName);
}
};

if (concurrent) {
await Promise.all(userEntities.map(processEntity));
} else {
for (const entity of userEntities) {
await processEntity(entity);
}
}
(dataSourceOptions[baseDirOrOptions.entitiesConfigKey] as any) =
Array.from(entities);

dataSourceOptions[entitiesConfigKey] = Array.from(entities);
debug(
`[core]: DataManager load ${
dataSourceOptions[baseDirOrOptions.entitiesConfigKey].length
} models from ${dataSourceName}.`
`[core]: DataManager load ${dataSourceOptions[entitiesConfigKey].length} models from ${dataSourceName}.`
);
}

// create data source
const opts = {
cacheInstance: dataSourceConfig.cacheInstance, // will default true
cacheInstance: dataSourceConfig.cacheInstance,
validateConnection: dataSourceConfig.validateConnection,
};
await this.createInstance(dataSourceOptions, dataSourceName, opts);
return this.createInstance(dataSourceOptions, dataSourceName, opts);
};

const entries = Object.entries(dataSourceConfig.dataSource);
if (concurrent) {
await Promise.all(
entries.map(([name, options]) => processDataSource(name, options))
);
} else {
for (const [name, options] of entries) {
await processDataSource(name, options);
}
}
}

Expand Down Expand Up @@ -134,43 +178,102 @@ export abstract class DataSourceManager<
return inst ? this.checkConnected(inst) : false;
}

public async createInstance(config: ConnectionOpts): Promise<T | void>;
public async createInstance(
config: ConnectionOpts,
clientName: string,
options?: {
/**
* @deprecated
*/
validateConnection?: boolean;
/**
* @deprecated
*/
cacheInstance?: boolean | undefined;
}
): Promise<T | void>;
public async createInstance(
config: any,
clientName: any,
config: ConnectionOpts,
clientName?: any,
options?: {
validateConnection?: boolean;
cacheInstance?: boolean | undefined;
}
): Promise<T | void> {
const cache =
options && typeof options.cacheInstance === 'boolean'
? options.cacheInstance
: true;
const validateConnection = (options && options.validateConnection) || false;
if (clientName && typeof clientName !== 'string') {
options = clientName;
clientName = undefined;
}

// options.clients[id] will be merged with options.default
const configNow = extend(true, {}, this.options['default'], config);
const client = await this.createDataSource(configNow, clientName);
if (cache && clientName && client) {
this.dataSource.set(clientName, client);
if (clientName && options && options.cacheInstance === false) {
// 后面就用传不传 clientName 来判断是否缓存
clientName = undefined;
}

if (validateConnection) {
if (!client) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} initialization failed.`
);
if (clientName) {
if (this.dataSource.has(clientName)) {
return this.dataSource.get(clientName);
}

if (this.creatingDataSources.has(clientName)) {
return this.creatingDataSources.get(clientName);
}
}

const validateConnection =
config.validateConnection ??
(options && options.validateConnection) ??
false;

const connected = await this.checkConnected(client);
if (!connected) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} is not connected.`
// options.clients[id] will be merged with options.default
const configNow = extend(true, {}, this.options['default'], config);

const clientCreatingPromise = this.createDataSource(configNow, clientName);

if (clientCreatingPromise && Types.isPromise(clientCreatingPromise)) {
if (clientName) {
this.creatingDataSources.set(
clientName,
clientCreatingPromise as Promise<T>
);
}

return (clientCreatingPromise as Promise<T>)
.then(async client => {
if (clientName) {
this.dataSource.set(clientName, client);
}

if (validateConnection) {
if (!client) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} initialization failed.`
);
}

const connected = await this.checkConnected(client);
if (!connected) {
throw new MidwayCommonError(
`[DataSourceManager] ${clientName} is not connected.`
);
}
}

return client;
})
.finally(() => {
if (clientName) {
this.creatingDataSources.delete(clientName);
}
});
}

return client;
// 处理同步返回的情况
if (clientName) {
this.dataSource.set(clientName, clientCreatingPromise as T);
}
return clientCreatingPromise;
}

/**
Expand Down Expand Up @@ -232,50 +335,53 @@ export abstract class DataSourceManager<
public isLowPriority(name: string) {
return this.priorityManager.isLowPriority(this.dataSourcePriority[name]);
}
}

export function formatGlobString(globString: string): string[] {
let pattern;
static formatGlobString(globString: string): string[] {
let pattern;

if (!/^\*/.test(globString)) {
globString = '/' + globString;
}
const parsePattern = parse(globString);
if (parsePattern.base && (/\*/.test(parsePattern.base) || parsePattern.ext)) {
pattern = [globString];
} else {
pattern = [...DEFAULT_PATTERN.map(p => join(globString, p))];
if (!/^\*/.test(globString)) {
globString = '/' + globString;
}
const parsePattern = parse(globString);
if (
parsePattern.base &&
(/\*/.test(parsePattern.base) || parsePattern.ext)
) {
pattern = [globString];
} else {
pattern = [...DEFAULT_PATTERN.map(p => join(globString, p))];
}
return pattern;
}
return pattern;
}

export async function globModels(
globString: string,
appDir: string,
loadMode?: ModuleLoadType
) {
const pattern = formatGlobString(globString);

const models = [];
// string will be glob file
const files = run(pattern, {
cwd: appDir,
ignore: IGNORE_PATTERN,
});
for (const file of files) {
const exports = await loadModule(file, {
loadMode,
static async globModels(
globString: string,
baseDir: string,
loadMode?: ModuleLoadType
) {
const pattern = this.formatGlobString(globString);

const models = [];
// string will be glob file
const files = run(pattern, {
cwd: baseDir,
ignore: IGNORE_PATTERN,
});
if (Types.isClass(exports)) {
models.push(exports);
} else {
for (const m in exports) {
const module = exports[m];
if (Types.isClass(module)) {
models.push(module);
for (const file of files) {
const exports = await loadModule(file, {
loadMode,
});
if (Types.isClass(exports)) {
models.push(exports);
} else {
for (const m in exports) {
const module = exports[m];
if (Types.isClass(module)) {
models.push(module);
}
}
}
}
return models;
}
return models;
}
Loading

0 comments on commit d89cf28

Please sign in to comment.