Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a queueTask method #1081

Open
wants to merge 7 commits into
base: 7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ package-lock.json
/example/public/index.js

es
.mocharc.js
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ All operation use es7 async/await to implement. All api is async function.
- [.putBucketInventory(name, inventory[, options])](#putBucketInventoryname-inventory-options)
- [.deleteBucketInventory(name, inventoryId[, options])](#deleteBucketInventoryname-inventoryid-options)
- [.listBucketInventory(name, [, options])](#listBucketInventoryname-options)
- utils
- [.queueTask(argList, customFunc[,options])](#queuetaskarglist-custom-options)

- [Object Operations](#object-operations)
- [.list(query[, options])](#listquery-options)
Expand Down Expand Up @@ -1579,6 +1581,64 @@ async function listBucketInventory() {
listBucketInventory();
```

## utils
Some public methods exposed by the sdk

### .queueTask(argList, custom[, options])

Execute tasks in a queue, default number of parallels 5 maximum cannot exceed 10.
Retry is not supported, please complete the retry operation yourself in the custom function

parameters:

- argList {any[]} Argument array for custom function
- customFunc {Function} Only async functions are supported
- [options] {Object} optinal parameters
- limit {Number} default 5


Success will return the object information.

object:

- sucessList complete tasks
- errorList error tasks

```javascript
client
.queueTask(
[
[
'1',
Buffer.from(
Array(1024 * 1024 * 10)
.fill('a')
.join('')
)
],
[
'2',
Buffer.from(
Array(1024 * 1024 * 10)
.fill('a')
.join('')
)
],
[
'3',
Buffer.from(
Array(1024 * 1024 * 10)
.fill('a')
.join('')
)
],
],
client.multipartUpload
).then(r=>console.log(r))

```


## Object Operations

All operations function return Promise, except `signatureUrl`.
Expand Down
71 changes: 34 additions & 37 deletions karma.conf.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,37 @@
const isCiEnv = process.env.ONCI;

module.exports = function(config) {
config.set({
plugins: [
require('karma-mocha'),
require('karma-browserify'),
require('karma-chrome-launcher'),
require('karma-firefox-launcher'),
require('karma-safari-launcher')
],
frameworks: ['mocha', 'browserify'],
browsers: isCiEnv ? ['ChromeHeadless'] : ['Chrome', 'Safari', 'Firefox'],
files: [
'test/browser/build/aliyun-oss-sdk.min.js',
'test/browser/build/tests.js'
],
// preprocessors: {
// 'dist/aliyun-oss-sdk.js': ['coverage']
// },
// coverageReporter: {
// type : 'html',
// dir : 'coverage-browser/'
// },
// reporters: ['progress', 'coverage'],
reporters: ['progress'],
port: 19876,
colors: true,
logLevel: config.LOG_INFO,
singleRun: true,
browserDisconnectTolerance: 3,
browserNoActivityTimeout: 30000,
concurrency: 1,
client: {
mocha: {
timeout: 6000
}
}
});
module.exports = function (config) {
config.set({
plugins: [
require('karma-mocha'),
require('karma-browserify'),
require('karma-chrome-launcher'),
require('karma-firefox-launcher'),
require('karma-safari-launcher')
],
frameworks: ['mocha', 'browserify'],
browsers: isCiEnv ? ['ChromeHeadless'] : ['Chrome', 'Safari', 'Firefox'],
files: ['test/browser/build/aliyun-oss-sdk.min.js', 'test/browser/build/tests.js'],
// preprocessors: {
// 'dist/aliyun-oss-sdk.js': ['coverage']
// },
// coverageReporter: {
// type : 'html',
// dir : 'coverage-browser/'
// },
// reporters: ['progress', 'coverage'],
reporters: ['progress'],
port: 19876,
colors: true,
logLevel: config.LOG_INFO,
singleRun: true,
browserDisconnectTolerance: 3,
browserNoActivityTimeout: 30000,
concurrency: 1,
client: {
mocha: {
timeout: 12000
}
}
});
};
4 changes: 4 additions & 0 deletions lib/common/utils/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { objectUrl } from './objectUrl';
import { parseXML } from './parseXML';
import { policy2Str } from './policy2Str';
import { WebFileReadStream } from './webFileReadStream';
import { queueTask } from './queueTask';
import { isAsync } from './isAsync';
declare const _default: {
_getObjectMeta: typeof _getObjectMeta;
authorization: typeof authorization;
Expand Down Expand Up @@ -66,5 +68,7 @@ declare const _default: {
_signatureForURL: typeof import("./signUtils")._signatureForURL;
};
WebFileReadStream: typeof WebFileReadStream;
queueTask: typeof queueTask;
isAsync: typeof isAsync;
};
export default _default;
6 changes: 5 additions & 1 deletion lib/common/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const parseXML_1 = require("./parseXML");
const policy2Str_1 = require("./policy2Str");
const signUtils_1 = __importDefault(require("./signUtils"));
const webFileReadStream_1 = require("./webFileReadStream");
const queueTask_1 = require("./queueTask");
const isAsync_1 = require("./isAsync");
exports.default = {
_getObjectMeta: _getObjectMeta_1._getObjectMeta,
authorization: authorization_1.authorization,
Expand Down Expand Up @@ -73,5 +75,7 @@ exports.default = {
parseXML: parseXML_1.parseXML,
policy2Str: policy2Str_1.policy2Str,
signUtils: signUtils_1.default,
WebFileReadStream: webFileReadStream_1.WebFileReadStream
WebFileReadStream: webFileReadStream_1.WebFileReadStream,
queueTask: queueTask_1.queueTask,
isAsync: isAsync_1.isAsync
};
1 change: 1 addition & 0 deletions lib/common/utils/isAsync.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export declare function isAsync(func: Function): boolean;
7 changes: 7 additions & 0 deletions lib/common/utils/isAsync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.isAsync = void 0;
function isAsync(func) {
return func.constructor.name === 'AsyncFunction';
}
exports.isAsync = isAsync;
10 changes: 10 additions & 0 deletions lib/common/utils/queueTask.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
declare type queueOptionsType = {
limit: number;
};
/**
* @param {any[]} argList - the arugments list for customFunc
* @param {Function} customFunc - customFunc
* @param {Object} options - limit default5
*/
export declare function queueTask(this: any, argList: any[], customFunc: Function, options?: queueOptionsType): Promise<unknown>;
export {};
74 changes: 74 additions & 0 deletions lib/common/utils/queueTask.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.queueTask = void 0;
const isAsync_1 = require("./isAsync");
const isFunction_1 = require("./isFunction");
/**
* @param {any[]} argList - the arugments list for customFunc
* @param {Function} customFunc - customFunc
* @param {Object} options - limit default5
*/
function queueTask(argList, customFunc, options = { limit: 5 }) {
const opts = Object.assign({}, options);
const { limit } = opts;
if (limit > 10) {
throw new Error('no more than 10 threads');
}
const isBrowserEnv = process && process.browser;
const errorList = [];
const sucessList = [];
const doing = [];
const queueList = argList.map(i => () => {
return new Promise((resolve, reject) => {
// browser
if (isBrowserEnv && isFunction_1.isFunction(customFunc)) {
customFunc
.apply(this, i)
.then(r => resolve(r))
.catch(err => reject(err));
}
// node
if (isAsync_1.isAsync(customFunc)) {
customFunc
.apply(this, i)
.then(r => resolve(r))
.catch(err => reject(err));
}
});
});
function task() {
return new Promise(resolve => {
const queueRun = () => {
if (!queueList || !queueList.length) {
return;
}
if (queueList.length > 0) {
const job = queueList.pop();
doing.push(job);
job()
.then(r => {
sucessList.push(r);
queueRun();
})
.catch(e => errorList.push(e.toString()))
.then(() => {
doing.pop();
if (!doing.length) {
resolve({
sucessList,
errorList
});
}
});
}
};
// limit customFun
for (let i = 0; i < limit; i++) {
queueRun();
}
});
}
// const result = await task();
return task();
}
exports.queueTask = queueTask;
6 changes: 5 additions & 1 deletion src/common/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import { parseXML } from './parseXML';
import { policy2Str } from './policy2Str';
import signUtils from './signUtils';
import { WebFileReadStream } from './webFileReadStream';
import { queueTask } from './queueTask';
import { isAsync } from './isAsync';

export default {
_getObjectMeta,
Expand Down Expand Up @@ -69,5 +71,7 @@ export default {
parseXML,
policy2Str,
signUtils,
WebFileReadStream
WebFileReadStream,
queueTask,
isAsync
};
3 changes: 3 additions & 0 deletions src/common/utils/isAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function isAsync(func: Function): boolean {
return func.constructor.name === 'AsyncFunction';
}
79 changes: 79 additions & 0 deletions src/common/utils/queueTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { isAsync } from './isAsync';
import { isFunction } from './isFunction';

type queueOptionsType = {
limit: number;
};

/**
* @param {any[]} argList - the arugments list for customFunc
* @param {Function} customFunc - customFunc
* @param {Object} options - limit default5
*/
export function queueTask(this: any, argList: any[], customFunc: Function, options: queueOptionsType = { limit: 5 }) {
const opts = Object.assign({}, options);
const { limit } = opts;
if (limit > 10) {
throw new Error('no more than 10 threads');
}

const isBrowserEnv = process && (process as any).browser;
const errorList: any[] = [];
const sucessList: any[] = [];
const doing: any[] = [];
const queueList: any[] = argList.map(i => () => {
return new Promise((resolve, reject) => {
// browser
if (isBrowserEnv && isFunction(customFunc)) {
customFunc
.apply(this, i)
.then(r => resolve(r))
.catch(err => reject(err));
}
// node
if (isAsync(customFunc)) {
customFunc
.apply(this, i)
.then(r => resolve(r))
.catch(err => reject(err));
}
});
});

function task() {
return new Promise(resolve => {
const queueRun = () => {
if (!queueList || !queueList.length) {
return;
}
if (queueList.length > 0) {
const job = queueList.pop();
doing.push(job);
job()
.then(r => {
sucessList.push(r);
queueRun();
})
.catch(e => errorList.push(e.toString()))
.then(() => {
doing.pop();
if (!doing.length) {
resolve({
sucessList,
errorList
});
}
});
}
};

// limit customFun
for (let i = 0; i < limit; i++) {
queueRun();
}
});
}

// const result = await task();
return task();
}
Loading