Skip to content

Commit

Permalink
[INLONG-4203][Manager] Improve HTTP request and response parse in man…
Browse files Browse the repository at this point in the history
…ager client (apache#4204)
  • Loading branch information
leosanqing authored and vernedeng committed May 19, 2022
1 parent 8b2c755 commit f6ce55d
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,22 @@ public InlongStream init() {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
String streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(Double.valueOf(streamIndex).intValue());
Double streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(streamIndex.intValue());
//Create source and update index
List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
for (SourceRequest sourceRequest : sourceRequests) {
String sourceIndex = managerClient.createSource(sourceRequest);
sourceRequest.setId(Double.valueOf(sourceIndex).intValue());
sourceRequest.setId(managerClient.createSource(sourceRequest).intValue());
}
//Create sink and update index
List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
for (SinkRequest sinkRequest : sinkRequests) {
String sinkIndex = managerClient.createSink(sinkRequest);
sinkRequest.setId(Double.valueOf(sinkIndex).intValue());
sinkRequest.setId(managerClient.createSink(sinkRequest).intValue());
}
//Create transform and update index
List<TransformRequest> transformRequests = Lists.newArrayList(streamContext.getTransformRequests().values());
for (TransformRequest transformRequest : transformRequests) {
String transformIndex = managerClient.createTransform(transformRequest);
transformRequest.setId(Double.valueOf(transformIndex).intValue());
transformRequest.setId(managerClient.createTransform(transformRequest).intValue());
}
return inlongStream;
}
Expand Down Expand Up @@ -200,8 +197,7 @@ private void initOrUpdateTransform() {
continue;
}
TransformRequest transformRequest = requestEntry.getValue();
String index = managerClient.createTransform(transformRequest);
transformRequest.setId(Double.valueOf(index).intValue());
transformRequest.setId(managerClient.createTransform(transformRequest).intValue());
}
}

Expand Down Expand Up @@ -238,8 +234,7 @@ private void initOrUpdateSource() {
continue;
}
SourceRequest sourceRequest = requestEntry.getValue();
String index = managerClient.createSource(sourceRequest);
sourceRequest.setId(Double.valueOf(index).intValue());
sourceRequest.setId(managerClient.createSource(sourceRequest).intValue());
}
}

Expand Down Expand Up @@ -276,8 +271,7 @@ private void initOrUpdateSink() {
continue;
}
SinkRequest sinkRequest = requestEntry.getValue();
String index = managerClient.createSink(sinkRequest);
sinkRequest.setId(Double.valueOf(index).intValue());
sinkRequest.setId(managerClient.createSink(sinkRequest).intValue());
}
}
}
Loading

0 comments on commit f6ce55d

Please sign in to comment.