Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fop persistent type #438

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
StatObjectResult
} from "./StorageResponseInterface";

export declare type callback = (e?: Error, respBody?: any, respInfo?: any) => void;
export declare type callback<T = any> = (e?: Error, respBody?: T, respInfo?: any) => void;

export declare namespace auth {
namespace digest {
Expand Down Expand Up @@ -1050,6 +1050,11 @@ export declare namespace fop {
* 结果是否强制覆盖已有的同名文件
*/
force?: boolean;

/**
* 为 `1` 时开启闲时任务
*/
type?: number;
}
class OperationManager {
mac: auth.digest.Mac;
Expand All @@ -1066,14 +1071,44 @@ export declare namespace fop {
* @param options
* @param callback
*/
pfop(bucket: string, key: string, fops: string[], pipeline: string, options: PfopOptions | null, callback: callback): void;
pfop(
bucket: string,
key: string,
fops: string[],
pipeline: string,
options: PfopOptions | null,
callback: callback<{
persistentId: string
}>
): void;

/**
* 查询持久化数据处理进度
* @param persistentId pfop操作返回的持久化处理ID
* @param persistentId pfop 操作返回的持久化处理ID
* @param callback
*/
prefop(persistentId: string, callback: callback): void;
prefop(
persistentId: string,
callback: callback<{
id: string,
pipeline: string,
code: number,
desc: string,
reqid: string,
inputBucket: string,
inputKey: string,
creationDate: string,
type: number,
items: {
cmd: string,
code: number,
desc: string,
returnOld: number,
error?: string,
hash?: string,
}[]
}>
): void;
}
}

Expand Down Expand Up @@ -1744,6 +1779,7 @@ export declare namespace rs {
persistentOps?: string;
persistentNotifyUrl?: string;
persistentPipeline?: string;
persistentType?: string;

fsizeLimit?: number;
fsizeMin?: number;
Expand Down
78 changes: 52 additions & 26 deletions qiniu/fop.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,42 @@ function OperationManager (mac, config) {
this.config = config || new conf.Config();
}

// 发送持久化数据处理请求
// @param bucket - 空间名称
// @param key - 文件名称
// @param fops - 处理指令集合
// @param pipeline - 处理队列名称
// @param options - 可选参数
// notifyURL 回调业务服务器,通知处理结果
// force 结果是否强制覆盖已有的同名文件
// @param callbackFunc(err, respBody, respInfo) - 回调函数
OperationManager.prototype.pfop = function (bucket, key, fops, pipeline,
options, callbackFunc) {
/**
* @typedef {function(Error, any, IncomingMessage)} OperationCallback
*/

/**
* @param {string} bucket 空间名称
* @param {string} key 文件名称
* @param {string[]} fops 处理指令
* @param {string} pipeline 队列名称
* @param {object} options 可选参数
* @param {string} [options.notifyURL] 回调业务服务器,通知处理结果
* @param {boolean} [options.force] 是否强制覆盖已有的同名文件
* @param {string} [options.type] 为 `1` 时,开启闲时任务
* @param {OperationCallback} callbackFunc 回调函数
*/
OperationManager.prototype.pfop = function (
bucket,
key,
fops,
pipeline,
options,
callbackFunc
) {
options = options || {};
// 必须参数
var reqParams = {
const reqParams = {
bucket: bucket,
key: key,
pipeline: pipeline,
fops: fops.join(';')
};

// pipeline
if (!pipeline) {
delete reqParams.pipeline;
}

// notifyURL
if (options.notifyURL) {
reqParams.notifyURL = options.notifyURL;
Expand All @@ -41,6 +57,11 @@ OperationManager.prototype.pfop = function (bucket, key, fops, pipeline,
reqParams.force = 1;
}

const persistentType = parseInt(options.type, 10);
if (!isNaN(persistentType)) {
reqParams.type = options.type;
}

util.prepareZone(this, this.mac.accessKey, bucket, function (err, ctx) {
if (err) {
callbackFunc(err, null, null);
Expand All @@ -51,27 +72,32 @@ OperationManager.prototype.pfop = function (bucket, key, fops, pipeline,
};

function pfopReq (mac, config, reqParams, callbackFunc) {
var scheme = config.useHttpsDomain ? 'https://' : 'http://';
var requestURI = scheme + config.zone.apiHost + '/pfop/';
var reqBody = querystring.stringify(reqParams);
var auth = util.generateAccessToken(mac, requestURI, reqBody);
const scheme = config.useHttpsDomain ? 'https://' : 'http://';
const requestURI = scheme + config.zone.apiHost + '/pfop/';
const reqBody = querystring.stringify(reqParams);
const auth = util.generateAccessToken(mac, requestURI, reqBody);
rpc.postWithForm(requestURI, reqBody, auth, callbackFunc);
}

// 查询持久化数据处理进度
// @param persistentId
// @callbackFunc(err, respBody, respInfo) - 回调函数
OperationManager.prototype.prefop = function (persistentId, callbackFunc) {
var apiHost = 'api.qiniu.com';
/**
* 查询持久化数据处理进度
* @param {string} persistentId
* @param {OperationCallback} callbackFunc 回调函数
*/
OperationManager.prototype.prefop = function (
persistentId,
callbackFunc
) {
let apiHost = 'api.qiniu.com';
if (this.config.zone) {
apiHost = this.config.zone.apiHost;
}

var scheme = this.config.useHttpsDomain ? 'https://' : 'http://';
var requestURI = scheme + apiHost + '/status/get/prefop';
var reqParams = {
const scheme = this.config.useHttpsDomain ? 'https://' : 'http://';
const requestURI = scheme + apiHost + '/status/get/prefop';
const reqParams = {
id: persistentId
};
var reqBody = querystring.stringify(reqParams);
const reqBody = querystring.stringify(reqParams);
rpc.postWithForm(requestURI, reqBody, null, callbackFunc);
};
2 changes: 1 addition & 1 deletion qiniu/storage/form.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ function putReq (
* @param {string | null} key
* @param {any} body
* @param {PutExtra | null} putExtra
* @param {reqCallback} callbackFunc
* @param {reqCallback} [callbackFunc]
* @returns {Promise<UploadResult>}
*/
FormUploader.prototype.put = function (
Expand Down
1 change: 1 addition & 0 deletions qiniu/storage/rs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,7 @@ function _putPolicyBuildInKeys () {
* @property {string} [persistentOps]
* @property {string} [persistentNotifyUrl]
* @property {string} [persistentPipeline]
* @property {string} [persistentType]
* @property {number} [fsizeLimit]
* @property {number} [fsizeMin]
* @property {string} [mimeLimit]
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function getEnvConfig () {
}
exports.getEnvConfig = getEnvConfig;

function checkEnvConfigAndExit () {
function checkEnvConfigOkOrExit () {
const envConfig = getEnvConfig();
if (
Object.keys(envConfig).some(k => !envConfig[k])
Expand All @@ -21,7 +21,7 @@ function checkEnvConfigAndExit () {
process.exit(0);
}
}
exports.checkEnvConfigAndExit = checkEnvConfigAndExit;
exports.checkEnvConfigOrExit = checkEnvConfigOkOrExit;

/**
* @typedef Param
Expand Down
Loading
Loading