From afedcccaffe1655101470cbeebd42b8ec6eba684 Mon Sep 17 00:00:00 2001 From: Anton Vikulov Date: Sun, 29 Oct 2023 21:24:29 +0500 Subject: [PATCH] ues pool --- scripts/build.cli.js | 3 +- src/cmd/build/index.ts | 42 +++--- src/constants.ts | 5 +- src/resolvers/processPage.ts | 11 +- src/steps/processLinter.ts | 102 +++----------- src/steps/processPages.ts | 163 ++++------------------ src/steps/processPool.ts | 102 ++++++++++++++ src/utils/file.ts | 17 ++- src/utils/workers.ts | 4 +- src/workers/linter/index.ts | 66 --------- src/workers/{transform => }/mainBridge.ts | 2 +- src/workers/pool/index.ts | 130 +++++++++++++++++ src/workers/transform/index.ts | 101 -------------- 13 files changed, 326 insertions(+), 422 deletions(-) create mode 100644 src/steps/processPool.ts delete mode 100644 src/workers/linter/index.ts rename src/workers/{transform => }/mainBridge.ts (98%) create mode 100644 src/workers/pool/index.ts delete mode 100644 src/workers/transform/index.ts diff --git a/scripts/build.cli.js b/scripts/build.cli.js index 999baaf7b..be03ac200 100644 --- a/scripts/build.cli.js +++ b/scripts/build.cli.js @@ -23,8 +23,7 @@ const commonConfig = { const builds = [ [['src/index.ts'], 'build/index.js'], - [['src/workers/linter/index.ts'], 'build/linter.js'], - [['src/workers/transform/index.ts'], 'build/transform.js'], + [['src/workers/pool/index.ts'], 'build/pool.js'], ]; Promise.all(builds.map(([entries, outfile]) => { diff --git a/src/cmd/build/index.ts b/src/cmd/build/index.ts index 70a7a0bed..d3a6a7a77 100644 --- a/src/cmd/build/index.ts +++ b/src/cmd/build/index.ts @@ -13,14 +13,13 @@ import {join, resolve} from 'path'; import {ArgvService, Includers} from '../../services'; import OpenapiIncluder from '@diplodoc/openapi-extension/includer'; import { - initLinterWorkers, - initProcessWorkers, processAssets, processExcludedFiles, processLinter, processLogs, processPages, processServiceFiles, + saveSinglePages, } from '../../steps'; import {prepareMapFile} from '../../steps/processMapFile'; import shell from 'shelljs'; @@ -29,6 +28,12 @@ import {copyFiles, logger} from '../../utils'; import {upload as publishFilesToS3} from '../publish/upload'; import glob from 'glob'; import {createVCSConnector} from '../../vcs-connector'; +import { + finishProcessPool, + getPoolEnv, + initProcessPool, + terminateProcessPool, +} from '../../steps/processPool'; export const build = { command: ['build', '$0'], @@ -199,6 +204,7 @@ async function handler(args: Arguments) { addMapFile, allowCustomResources, resources, + singlePage, } = ArgvService.getConfig(); preparingTemporaryFolders(userOutputFolder); @@ -215,28 +221,29 @@ async function handler(args: Arguments) { const pathToRedirects = join(args.input, REDIRECTS_FILENAME); const pathToLintConfig = join(args.input, LINT_CONFIG_FILENAME); - const promises: Promise[] = []; - const vcsConnector = createVCSConnector(); - - if (!lintDisabled) { - /* Initialize workers in advance to avoid a timeout failure due to not receiving a message from them */ - await initLinterWorkers(); - promises.push(processLinter()); - } - if (vcsConnector) { - promises.push(vcsConnector.init()); + await vcsConnector.init(); } - await Promise.all(promises); + await initProcessPool({vcsConnector}); - if (!buildDisabled) { - await initProcessWorkers(); - await processPages(vcsConnector); + try { + await Promise.all([ + !lintDisabled && processLinter(), + !buildDisabled && processPages(vcsConnector), + ]); + await finishProcessPool(); + } finally { + await terminateProcessPool(); } if (!buildDisabled) { + if (singlePage) { + const {singlePageResults} = getPoolEnv(); + await saveSinglePages(outputBundlePath, singlePageResults); + } + // process additional files switch (outputFormat) { case 'html': @@ -314,7 +321,6 @@ function preparingTemporaryFolders(userOutputFolder: string) { // Create temporary input/output folders shell.rm('-rf', args.input, args.output); shell.mkdir(args.input, args.output); - shell.chmod('-R', 'u+w', args.input); copyFiles( args.rootInput, @@ -326,4 +332,6 @@ function preparingTemporaryFolders(userOutputFolder: string) { ignore: ['node_modules/**', '*/node_modules/**'], }), ); + + shell.chmod('-R', 'u+w', args.input); } diff --git a/src/constants.ts b/src/constants.ts index 8628adafa..5ddc7fec7 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -116,7 +116,8 @@ export const REGEXP_INCLUDE_FILE_PATH = /(?<=[(]).+(?=[)])/g; // Regexp result: authorLogin export const REGEXP_AUTHOR = /(?<=author:\s).+(?=\r?\n)/g; -export const MIN_CHUNK_SIZE = Number(process.env.MIN_CHUNK_SIZE) || 100; -export const WORKERS_COUNT = Number(process.env.WORKERS_COUNT) || os.cpus().length - 1; +export const MIN_CHUNK_SIZE = Number(process.env.MIN_CHUNK_SIZE) || 10; +export const THREAD_PART_COUNT = Number(process.env.THREAD_PART_COUNT) || 8; +export const WORKERS_COUNT = Number(process.env.WORKERS_COUNT) || os.cpus().length - 1 || 1; export const metadataBorder = '---'; diff --git a/src/resolvers/processPage.ts b/src/resolvers/processPage.ts index ce82acea8..14bb55414 100644 --- a/src/resolvers/processPage.ts +++ b/src/resolvers/processPage.ts @@ -11,6 +11,7 @@ import * as fs from 'fs'; import {dump, load} from 'js-yaml'; import {resolveMd2Md} from './md2md'; import {resolveMd2HTML} from './md2html'; +import shell from 'shelljs'; let singlePageResults: Record; let singlePagePaths: Record>; @@ -177,7 +178,7 @@ async function preparingPagesByOutputFormat( (outputFormat === 'md' && isYamlFileExtension) || (outputFormat === 'html' && !isYamlFileExtension && fileExtension !== '.md') ) { - await copyFileWithoutChanges(resolvedPathToFile, outputDir, filename); + copyFileWithoutChanges(resolvedPathToFile, outputDir, filename); return; } @@ -215,15 +216,11 @@ async function processingYamlFile(path: PathData, metaDataOptions: MetaDataOptio await fs.promises.writeFile(resolve(outputFolderPath, pathToFile), dump(parsedContent)); } -async function copyFileWithoutChanges( - resolvedPathToFile: string, - outputDir: string, - filename: string, -) { +function copyFileWithoutChanges(resolvedPathToFile: string, outputDir: string, filename: string) { const from = resolvedPathToFile; const to = resolve(outputDir, filename); - await fs.promises.copyFile(from, to); + shell.cp(from, to); } async function processingFileToMd(path: PathData, metaDataOptions: MetaDataOptions): Promise { diff --git a/src/steps/processLinter.ts b/src/steps/processLinter.ts index 313d500d3..d27fd191e 100644 --- a/src/steps/processLinter.ts +++ b/src/steps/processLinter.ts @@ -1,99 +1,41 @@ -import log from '@diplodoc/transform/lib/log'; -import {spawn, Worker, Thread} from 'threads'; +import {PluginService, TocService} from '../services'; +import {getChunkSize} from '../utils/workers'; +import {runPool} from './processPool'; +import {chunk} from 'lodash'; +import {lintPage} from '../resolvers'; import {extname} from 'path'; - -import {ArgvService, TocService, PresetService, PluginService} from '../services'; -import {ProcessLinterWorker} from '../workers/linter'; import {logger} from '../utils'; -import {LINTING_FINISHED, WORKERS_COUNT, MIN_CHUNK_SIZE} from '../constants'; -import {lintPage} from '../resolvers'; -import {splitOnChunks} from '../utils/worker'; -import {getChunkSize} from '../utils/workers'; - -let processLinterWorkers: (ProcessLinterWorker & Thread)[]; -let navigationPathsChunks: string[][]; +import {LINTING_FINISHED} from '../constants'; export async function processLinter(): Promise { - const argvConfig = ArgvService.getConfig(); - const navigationPaths = TocService.getNavigationPaths(); - if (!processLinterWorkers) { - lintPagesFallback(navigationPaths); - + if (process.env.DISABLE_PARALLEL_BUILD) { + await lintPagesFallback(navigationPaths); return; } - const presetStorageDump = PresetService.dump(); - - /* Subscribe on the linted page event */ - processLinterWorkers.forEach((worker) => { - worker.getProcessedPages().subscribe((pathToFile) => { - logger.info(pathToFile as string, LINTING_FINISHED); - }); - }); - - /* Run processing the linter */ - await Promise.all( - processLinterWorkers.map((worker, i) => { - const navigationPathsChunk = navigationPathsChunks[i]; - - return worker.run({ - argvConfig, - navigationPaths: navigationPathsChunk, - presetStorageDump, - }); - }), - ); - - /* Unsubscribe from workers */ - await Promise.all( - processLinterWorkers.map((worker) => { - return worker.finish().then((logs) => { - log.add(logs); - }); - }), - ); + const navigationPathsChunks = chunk(navigationPaths, getChunkSize(navigationPaths)); - /* Terminate workers */ await Promise.all( - processLinterWorkers.map((worker) => { - return Thread.terminate(worker); + navigationPathsChunks.map(async (navigationPathsChunk) => { + await runPool('lint', navigationPathsChunk); }), ); } -export async function initLinterWorkers() { - const navigationPaths = TocService.getNavigationPaths() - .filter((filename) => extname(filename) === '.md'); - const chunkSize = getChunkSize(navigationPaths); - - if (process.env.DISABLE_PARALLEL_BUILD || chunkSize < MIN_CHUNK_SIZE || WORKERS_COUNT <= 0) { - return; - } - - navigationPathsChunks = splitOnChunks(navigationPaths, chunkSize).filter((arr) => arr.length); - - const workersCount = navigationPathsChunks.length; +async function lintPagesFallback(navigationPaths: string[]) { + PluginService.setPlugins(); - processLinterWorkers = await Promise.all( - new Array(workersCount).fill(null).map(() => { - // TODO: get linter path from env - return spawn(new Worker('./linter'), {timeout: 60000}); + await Promise.all( + navigationPaths.map(async (pathToFile) => { + await lintPage({ + inputPath: pathToFile, + fileExtension: extname(pathToFile), + onFinish: () => { + logger.info(pathToFile, LINTING_FINISHED); + }, + }); }), ); } - -function lintPagesFallback(navigationPaths: string[]) { - PluginService.setPlugins(); - - navigationPaths.forEach((pathToFile) => { - lintPage({ - inputPath: pathToFile, - fileExtension: extname(pathToFile), - onFinish: () => { - logger.info(pathToFile, LINTING_FINISHED); - }, - }); - }); -} diff --git a/src/steps/processPages.ts b/src/steps/processPages.ts index 8803f3f46..f8dfcaf39 100644 --- a/src/steps/processPages.ts +++ b/src/steps/processPages.ts @@ -1,135 +1,39 @@ import {join, relative, resolve} from 'path'; import {writeFileSync} from 'fs'; -import log from '@diplodoc/transform/lib/log'; - -import {ArgvService, PluginService, PresetService, TocService} from '../services'; -import { - generateStaticMarkup, - joinSinglePageResults, - logger, - transformTocForSinglePage, -} from '../utils'; +import {ArgvService, PluginService, TocService} from '../services'; +import {generateStaticMarkup, joinSinglePageResults, transformTocForSinglePage} from '../utils'; import {SinglePageResult, YfmToc} from '../models'; -import { - BUNDLE_FOLDER, - Lang, - MIN_CHUNK_SIZE, - PROCESSING_FINISHED, - SINGLE_PAGE_DATA_FILENAME, - SINGLE_PAGE_FILENAME, - WORKERS_COUNT, -} from '../constants'; -import {splitOnChunks} from '../utils/worker'; -import {spawn, Thread, Worker} from 'threads'; +import {Lang, SINGLE_PAGE_DATA_FILENAME, SINGLE_PAGE_FILENAME} from '../constants'; import {getChunkSize} from '../utils/workers'; -import {ProcessTransformWorker} from '../workers/transform'; -import {processPage} from '../resolvers/processPage'; -import {asyncify, mapLimit} from 'async'; +import {getPoolEnv, runPool} from './processPool'; +import {chunk} from 'lodash'; import {VCSConnector} from '../vcs-connector/connector-models'; -import {MainBridge, SendPayload} from '../workers/transform/mainBridge'; - -const singlePageResults: Record = {}; -const singlePagePaths: Record> = {}; - -let processTransformWorkers: (ProcessTransformWorker & Thread)[]; -let navigationPathsChunks: string[][]; +import {asyncify, mapLimit} from 'async'; +import {processPage} from '../resolvers/processPage'; // Processes files of documentation (like index.yaml, *.md) export async function processPages(vcsConnector?: VCSConnector): Promise { - const argvConfig = ArgvService.getConfig(); - - const {output: outputFolderPath, singlePage} = argvConfig; - - const outputBundlePath = join(outputFolderPath, BUNDLE_FOLDER); - - if (!processTransformWorkers) { - await processPagesFallback(vcsConnector, TocService.getNavigationPaths()); - } else { - const presetStorageDump = PresetService.dump(); - const tocServiceDataDump = TocService.dump(); - const vcsConnectorDump = vcsConnector?.dump(); - - const scope = { - vcsConnector: vcsConnector, - }; - - processTransformWorkers.forEach((worker) => { - worker.getProcessedPages().subscribe(async (payload) => { - const data = payload as {type: 'path'; path: string} | SendPayload; - if (data.type === 'path') { - logger.info(data.path, PROCESSING_FINISHED); - return; - } - - if (data.type === 'call') { - await MainBridge.handleCall(worker, data, scope); - } - }); - }); - - /* Run processing the linter */ - await Promise.all( - processTransformWorkers.map((worker, i) => { - const navigationPathsChunk = navigationPathsChunks[i]; - - return worker.run({ - argvConfig, - navigationPaths: navigationPathsChunk, - presetStorageDump, - tocServiceDataDump, - vcsConnectorDump, - }); - }), - ); - - /* Unsubscribe from workers */ - await Promise.all( - processTransformWorkers.map((worker) => { - return worker - .finish() - .then( - ({ - logs, - singlePagePathsSerialized, - singlePageResults: singlePageResultsLocal, - }) => { - Object.entries(singlePageResultsLocal).forEach(([key, values]) => { - let arr = singlePageResults[key]; - if (!arr) { - arr = singlePageResults[key] = []; - } - arr.push(...values); - }); - - Object.entries(singlePagePathsSerialized).forEach(([key, values]) => { - let setMap = singlePagePaths[key]; - if (!setMap) { - setMap = new Set(); - singlePagePaths[key] = setMap; - } - values.forEach((p) => setMap.add(p)); - }); - log.add(logs); - }, - ); - }), - ); + const navigationPaths = TocService.getNavigationPaths(); - /* Terminate workers */ - await Promise.all( - processTransformWorkers.map((worker) => { - return Thread.terminate(worker); - }), - ); + if (process.env.DISABLE_PARALLEL_BUILD) { + await processPagesFallback(vcsConnector, navigationPaths); + return; } - if (singlePage) { - await saveSinglePages(outputBundlePath); - } + const navigationPathsChunks = chunk(navigationPaths, getChunkSize(navigationPaths)); + + await Promise.all( + navigationPathsChunks.map(async (navigationPathsChunk) => { + await runPool('transform', navigationPathsChunk); + }), + ); } -async function saveSinglePages(outputBundlePath: string) { +export async function saveSinglePages( + outputBundlePath: string, + singlePageResults: Record, +) { const { input: inputFolderPath, output: outputFolderPath, @@ -187,32 +91,15 @@ async function saveSinglePages(outputBundlePath: string) { } } -export async function initProcessWorkers() { - const navigationPaths = TocService.getNavigationPaths(); - const chunkSize = getChunkSize(navigationPaths); - - if (process.env.DISABLE_PARALLEL_BUILD || chunkSize < MIN_CHUNK_SIZE || WORKERS_COUNT <= 0) { - return; - } - - navigationPathsChunks = splitOnChunks(navigationPaths, chunkSize).filter((arr) => arr.length); - - const workersCount = navigationPathsChunks.length; - - processTransformWorkers = await Promise.all( - new Array(workersCount).fill(null).map(() => { - // TODO: get transform path from env - return spawn(new Worker('./transform'), {timeout: 60000}); - }), - ); -} - async function processPagesFallback( vcsConnector: VCSConnector | undefined, navigationPaths: string[], ) { PluginService.setPlugins(); + const {singlePageResults} = getPoolEnv(); + const singlePagePaths: Record> = {}; + const concurrency = 500; await mapLimit( diff --git a/src/steps/processPool.ts b/src/steps/processPool.ts new file mode 100644 index 000000000..96c2f0b8e --- /dev/null +++ b/src/steps/processPool.ts @@ -0,0 +1,102 @@ +import {ModuleThread, Pool, spawn, Thread, Worker} from 'threads'; +import {ProcessPoolWorker} from '../workers/pool'; +import {ArgvService, PresetService, TocService} from '../services'; +import {WORKERS_COUNT} from '../constants'; +import {VCSConnector} from '../vcs-connector/connector-models'; +import log from '@diplodoc/transform/lib/log'; +import {SinglePageResult} from '../models'; +import {logger} from '../utils'; +import {MainBridge} from '../workers/mainBridge'; + +const poolEnv = { + singlePageResults: {} as Record, +}; + +const workers: (ProcessPoolWorker & Thread)[] = []; +let pool: Pool> | undefined; + +interface InitProcessPoolProps { + vcsConnector?: VCSConnector; +} + +export async function initProcessPool({vcsConnector}: InitProcessPoolProps) { + const argvConfig = ArgvService.getConfig(); + const presetStorageDump = PresetService.dump(); + const tocServiceDataDump = TocService.dump(); + const vcsConnectorDump = vcsConnector?.dump(); + + const scope = { + vcsConnector: vcsConnector, + }; + + // eslint-disable-next-line new-cap + pool = Pool(async () => { + const worker = await spawn(new Worker('./pool'), {timeout: 60000}); + await worker.init({ + argvConfig, + presetStorageDump, + tocServiceDataDump, + vcsConnectorDump, + }); + worker.getSubject().subscribe(async (payload) => { + switch (payload.type) { + case 'path': { + logger.info(payload.path, payload.message); + break; + } + case 'call': { + await MainBridge.handleCall(worker, payload, scope); + break; + } + } + }); + workers.push(worker); + return worker; + }, WORKERS_COUNT); +} + +export async function runPool(type: 'lint' | 'transform', navigationPaths: string[]) { + if (!pool) { + throw new Error('Pool is not initiated'); + } + + return pool.queue(async (worker) => { + if (type === 'lint') { + await worker.lint({navigationPaths}); + } else { + await worker.transform({navigationPaths}); + } + }); +} + +export async function finishProcessPool() { + await Promise.all( + workers.map(async (worker) => { + const {logs, singlePageResults: singlePageResultsLocal} = await worker.finish(); + + Object.entries(singlePageResultsLocal).forEach(([key, values]) => { + let arr = poolEnv.singlePageResults[key]; + if (!arr) { + arr = poolEnv.singlePageResults[key] = []; + } + arr.push(...values); + }); + + log.add(logs); + }), + ); +} + +export async function terminateProcessPool() { + if (!pool) { + throw new Error('Pool is not initiated'); + } + + workers.splice(0); + await pool.terminate(true); + pool = undefined; +} + +export function getPoolEnv() { + return poolEnv; +} diff --git a/src/utils/file.ts b/src/utils/file.ts index 021f1dad6..c599f84fb 100644 --- a/src/utils/file.ts +++ b/src/utils/file.ts @@ -1,6 +1,5 @@ import {dirname, resolve} from 'path'; import shell from 'shelljs'; -import {copyFileSync} from 'fs'; import {logger} from './logger'; export function copyFiles( @@ -8,14 +7,20 @@ export function copyFiles( outputFolderPath: string, files: string[], ): void { - for (const pathToAsset of files) { - const outputDir: string = resolve(outputFolderPath, dirname(pathToAsset)); + const dirs = new Set(); + + files.forEach((pathToAsset) => { + const outputDir = resolve(outputFolderPath, dirname(pathToAsset)); const from = resolve(inputFolderPath, pathToAsset); const to = resolve(outputFolderPath, pathToAsset); - shell.mkdir('-p', outputDir); - copyFileSync(from, to); + if (!dirs.has(outputDir)) { + dirs.add(outputDir); + shell.mkdir('-p', outputDir); + } + + shell.cp(from, to); logger.copy(pathToAsset); - } + }); } diff --git a/src/utils/workers.ts b/src/utils/workers.ts index 433003444..5583d5ad3 100644 --- a/src/utils/workers.ts +++ b/src/utils/workers.ts @@ -1,5 +1,5 @@ -import {WORKERS_COUNT} from '../constants'; +import {MIN_CHUNK_SIZE, THREAD_PART_COUNT, WORKERS_COUNT} from '../constants'; export function getChunkSize(arr: string[]) { - return Math.ceil(arr.length / WORKERS_COUNT); + return Math.max(Math.ceil(arr.length / WORKERS_COUNT / THREAD_PART_COUNT), MIN_CHUNK_SIZE); } diff --git a/src/workers/linter/index.ts b/src/workers/linter/index.ts deleted file mode 100644 index 3f3c481bc..000000000 --- a/src/workers/linter/index.ts +++ /dev/null @@ -1,66 +0,0 @@ -import log from '@diplodoc/transform/lib/log'; -import {extname} from 'path'; -import {Observable, Subject} from 'threads/observable'; -import {expose} from 'threads'; - -import {ArgvService, PluginService, PresetService, TocService} from '../../services'; -import {TocServiceData} from '../../services/tocs'; -import {PresetStorageDump} from '../../services/preset'; -import {YfmArgv} from '../../models'; -import {lintPage} from '../../resolvers'; -import {asyncify, mapLimit} from 'async'; - -let processedPages = new Subject(); - -interface ProcessLinterWorkerOptions { - argvConfig: YfmArgv; - navigationPaths: TocServiceData['navigationPaths']; - presetStorageDump: PresetStorageDump; -} - -async function run({argvConfig, navigationPaths, presetStorageDump}: ProcessLinterWorkerOptions) { - ArgvService.set(argvConfig); - PresetService.load(presetStorageDump); - TocService.setNavigationPaths(navigationPaths); - - PluginService.setPlugins(); - - const concurrency = 500; - - await mapLimit( - navigationPaths, - concurrency, - asyncify(async (pathToFile: string) => { - await lintPage({ - inputPath: pathToFile, - fileExtension: extname(pathToFile), - onFinish: () => { - processedPages.next(pathToFile); - }, - }); - }), - ); -} - -async function finish() { - processedPages.complete(); - processedPages = new Subject(); - - return log.get(); -} - -function getProcessedPages() { - return Observable.from(processedPages); -} - -export type ProcessLinterWorker = { - run: typeof run; - finish: typeof finish; - getProcessedPages: typeof getProcessedPages; -}; - -expose({ - run, - finish, - getProcessedPages, -}); diff --git a/src/workers/transform/mainBridge.ts b/src/workers/mainBridge.ts similarity index 98% rename from src/workers/transform/mainBridge.ts rename to src/workers/mainBridge.ts index 99bc375f3..e3432684b 100644 --- a/src/workers/transform/mainBridge.ts +++ b/src/workers/mainBridge.ts @@ -1,4 +1,4 @@ -type EmitFn = (data: unknown) => void; +type EmitFn = (data: SendPayload) => void; type Callback = (payload: ReceivePayload) => void; type ReplyFn = (id: number, payload: ReceivePayload) => void; diff --git a/src/workers/pool/index.ts b/src/workers/pool/index.ts new file mode 100644 index 000000000..f1fd7ccee --- /dev/null +++ b/src/workers/pool/index.ts @@ -0,0 +1,130 @@ +import {SinglePageResult, YfmArgv} from '../../models'; +import {TocServiceDataDump} from '../../services/tocs'; +import {PresetStorageDump} from '../../services/preset'; +import {VCSConnector, VCSConnectorDump} from '../../vcs-connector/connector-models'; +import {ArgvService, PluginService, PresetService, TocService} from '../../services'; +import {createVCSConnector} from '../../vcs-connector'; +import {Observable, Subject} from 'threads/observable'; +import {MainBridge, ReceivePayload, SendPayload} from '../mainBridge'; +import {expose} from 'threads'; +import {asyncify, mapLimit} from 'async'; +import {lintPage} from '../../resolvers'; +import {extname} from 'path'; +import log from '@diplodoc/transform/lib/log'; +import {processPage} from '../../resolvers/processPage'; +import {LINTING_FINISHED, PROCESSING_FINISHED} from '../../constants'; + +const concurrency = 500; + +export type PoolSubjectPayload = {type: 'path'; path: string; message: string} | SendPayload; + +let subject = new Subject(); + +const singlePageResults: Record = {}; +let vcsConnector: VCSConnector | undefined; + +const mainBridge = new MainBridge((payload) => subject.next(payload)); + +interface ProcessPoolWorkerOptions { + argvConfig: YfmArgv; + presetStorageDump: PresetStorageDump; + tocServiceDataDump: TocServiceDataDump; + vcsConnectorDump?: VCSConnectorDump; +} + +async function init({ + argvConfig, + vcsConnectorDump, + tocServiceDataDump, + presetStorageDump, +}: ProcessPoolWorkerOptions) { + ArgvService.set(argvConfig); + PresetService.load(presetStorageDump); + TocService.load(tocServiceDataDump); + + vcsConnector = createVCSConnector(); + if (vcsConnectorDump && vcsConnector) { + vcsConnector.load(vcsConnectorDump); + vcsConnector.getUserByLogin = mainBridge.createFn( + 'vcsConnector.getUserByLogin', + ); + } + + PluginService.setPlugins(); +} + +interface LintProps { + navigationPaths: string[]; +} + +async function lint({navigationPaths}: LintProps) { + await mapLimit( + navigationPaths, + concurrency, + asyncify(async (pathToFile: string) => { + await lintPage({ + inputPath: pathToFile, + fileExtension: extname(pathToFile), + onFinish: () => { + subject.next({type: 'path', path: pathToFile, message: LINTING_FINISHED}); + }, + }); + }), + ); +} + +interface TransformProps { + navigationPaths: string[]; +} + +async function transform({navigationPaths}: TransformProps) { + const singlePagePaths: Record> = {}; + + await mapLimit( + navigationPaths, + concurrency, + asyncify(async (pathToFile: string) => { + await processPage({ + pathToFile, + vcsConnector, + singlePageResults, + singlePagePaths, + }).finally(() => { + subject.next({type: 'path', path: pathToFile, message: PROCESSING_FINISHED}); + }); + }), + ); +} + +async function finish() { + subject.complete(); + subject = new Subject(); + + return {logs: log.get(), singlePageResults}; +} + +function getSubject() { + return Observable.from(subject); +} + +function reply(id: number, payload: ReceivePayload) { + mainBridge.handleReply(id, payload); +} + +export type ProcessPoolWorker = { + init: typeof init; + lint: typeof lint; + transform: typeof transform; + getSubject: typeof getSubject; + reply: typeof reply; + finish: typeof finish; +}; + +expose({ + init, + lint, + transform, + getSubject, + reply, + finish, +}); diff --git a/src/workers/transform/index.ts b/src/workers/transform/index.ts deleted file mode 100644 index 375403f18..000000000 --- a/src/workers/transform/index.ts +++ /dev/null @@ -1,101 +0,0 @@ -import log from '@diplodoc/transform/lib/log'; -import {Observable, Subject} from 'threads/observable'; -import {expose} from 'threads'; - -import {ArgvService, PluginService, PresetService, TocService} from '../../services'; -import {TocServiceData, TocServiceDataDump} from '../../services/tocs'; -import {PresetStorageDump} from '../../services/preset'; -import {SinglePageResult, YfmArgv} from '../../models'; -import {asyncify, mapLimit} from 'async'; -import {processPage} from '../../resolvers/processPage'; -import {VCSConnectorDump} from '../../vcs-connector/connector-models'; -import {createVCSConnector} from '../../vcs-connector'; -import {MainBridge, ReceivePayload} from './mainBridge'; - -let processedPages = new Subject(); - -const mainBridge = new MainBridge((payload) => processedPages.next(payload)); - -const singlePageResults: Record = {}; -const singlePagePaths: Record> = {}; - -interface ProcessProcessWorkerOptions { - argvConfig: YfmArgv; - navigationPaths: TocServiceData['navigationPaths']; - presetStorageDump: PresetStorageDump; - tocServiceDataDump: TocServiceDataDump; - vcsConnectorDump?: VCSConnectorDump; -} - -async function run({ - argvConfig, - navigationPaths, - presetStorageDump, - tocServiceDataDump, - vcsConnectorDump, -}: ProcessProcessWorkerOptions) { - ArgvService.set(argvConfig); - PresetService.load(presetStorageDump); - TocService.load(tocServiceDataDump); - - const vcsConnector = createVCSConnector(); - if (vcsConnectorDump && vcsConnector) { - vcsConnector.load(vcsConnectorDump); - vcsConnector.getUserByLogin = mainBridge.createFn( - 'vcsConnector.getUserByLogin', - ); - } - - PluginService.setPlugins(); - - const concurrency = 500; - - await mapLimit( - navigationPaths, - concurrency, - asyncify(async (pathToFile: string) => { - await processPage({ - pathToFile, - vcsConnector, - singlePageResults, - singlePagePaths, - }).finally(() => { - processedPages.next({type: 'path', path: pathToFile}); - }); - }), - ); -} - -async function finish() { - processedPages.complete(); - processedPages = new Subject(); - - const singlePagePathsSerialized: Record = {}; - Object.entries(singlePagePaths).forEach(([key, valueSet]) => { - singlePagePathsSerialized[key] = Array.from(valueSet.keys()); - }); - - return {logs: log.get(), singlePagePathsSerialized, singlePageResults}; -} - -function getProcessedPages() { - return Observable.from(processedPages); -} - -function reply(id: number, payload: ReceivePayload) { - mainBridge.handleReply(id, payload); -} - -export type ProcessTransformWorker = { - run: typeof run; - finish: typeof finish; - getProcessedPages: typeof getProcessedPages; - reply: typeof reply; -}; - -expose({ - run, - finish, - getProcessedPages, - reply, -});