Skip to content

Commit de7cd05

Browse files
committed
Implement copy rdf rel table
1 parent ea5d3b3 commit de7cd05

File tree

23 files changed

+176
-66
lines changed

23 files changed

+176
-66
lines changed

dataset/copy-test/node/turtle/copy.cypher

Lines changed: 0 additions & 1 deletion
This file was deleted.

dataset/copy-test/rdf/copy.cypher

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
COPY taxonomy_RESOURCE FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
2+
COPY taxonomy_TRIPLES FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
File renamed without changes.
File renamed without changes.

src/binder/bind/bind_copy.cpp

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,24 +191,27 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
191191
auto nodeOffset =
192192
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
193193
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
194-
tableSchema, std::move(columns), std::move(nodeOffset), nullptr, nullptr, containsSerial);
194+
tableSchema, std::move(columns), std::move(nodeOffset), nullptr /* boundOffsetExpression */,
195+
nullptr /* nbrOffsetExpression */, nullptr /* predicateOffsetExpression */, containsSerial);
195196
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
196197
}
197198

198199
std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
199200
std::unique_ptr<CopyDescription> copyDescription, TableSchema* tableSchema) {
200201
// For table with SERIAL columns, we need to read in serial from files.
201202
auto containsSerial = bindContainsSerial(tableSchema);
202-
auto columns = bindCopyRelColumns(tableSchema);
203+
auto columns = bindCopyRelColumns(tableSchema, copyDescription->fileType);
203204
auto nodeOffset =
204205
createVariable(std::string(Property::OFFSET_NAME), LogicalType{LogicalTypeID::INT64});
205206
auto boundOffset = createVariable(
206207
std::string(Property::REL_BOUND_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
207208
auto nbrOffset = createVariable(
208209
std::string(Property::REL_NBR_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
209-
auto boundCopyFromInfo =
210-
std::make_unique<BoundCopyFromInfo>(std::move(copyDescription), tableSchema,
211-
std::move(columns), std::move(nodeOffset), boundOffset, nbrOffset, containsSerial);
210+
auto predicateOffset = createVariable(
211+
std::string(Property::REL_PREDICATE_OFFSET_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN});
212+
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
213+
tableSchema, std::move(columns), std::move(nodeOffset), std::move(boundOffset),
214+
std::move(nbrOffset), std::move(predicateOffset), containsSerial);
212215
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
213216
}
214217

@@ -253,18 +256,34 @@ expression_vector Binder::bindCopyNodeColumns(
253256
return columnExpressions;
254257
}
255258

256-
expression_vector Binder::bindCopyRelColumns(TableSchema* tableSchema) {
259+
expression_vector Binder::bindCopyRelColumns(
260+
TableSchema* tableSchema, CopyDescription::FileType fileType) {
257261
expression_vector columnExpressions;
258-
columnExpressions.push_back(createVariable(
259-
std::string(Property::REL_FROM_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
260-
columnExpressions.push_back(createVariable(
261-
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
262-
for (auto& property : tableSchema->properties) {
263-
if (skipPropertyInFile(*property)) {
264-
continue;
265-
}
262+
switch (fileType) {
263+
case common::CopyDescription::FileType::TURTLE: {
264+
columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING}));
266265
columnExpressions.push_back(
267-
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
266+
createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING}));
267+
columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING}));
268+
} break;
269+
case common::CopyDescription::FileType::CSV:
270+
case common::CopyDescription::FileType::PARQUET:
271+
case common::CopyDescription::FileType::NPY: {
272+
columnExpressions.push_back(createVariable(std::string(Property::REL_FROM_PROPERTY_NAME),
273+
LogicalType{LogicalTypeID::ARROW_COLUMN}));
274+
columnExpressions.push_back(createVariable(
275+
std::string(Property::REL_TO_PROPERTY_NAME), LogicalType{LogicalTypeID::ARROW_COLUMN}));
276+
for (auto& property : tableSchema->properties) {
277+
if (skipPropertyInFile(*property)) {
278+
continue;
279+
}
280+
columnExpressions.push_back(
281+
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
282+
}
283+
} break;
284+
default: {
285+
throw NotImplementedException{"Binder::bindCopyRelColumns"};
286+
}
268287
}
269288
return columnExpressions;
270289
}

src/binder/copy/bound_copy_from.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
1313
std::move(copiedColumnExpressions), offsetExpression->copy(),
1414
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
1515
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
16+
tableSchema->tableType == common::TableType::REL ? predicateOffsetExpression->copy() :
17+
nullptr,
1618
containsSerial);
1719
}
1820

src/common/data_chunk/data_chunk.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,11 @@ void DataChunk::insert(uint32_t pos, std::shared_ptr<ValueVector> valueVector) {
99
valueVectors[pos] = std::move(valueVector);
1010
}
1111

12+
void DataChunk::resetAuxiliaryBuffer() {
13+
for (auto& valueVector : valueVectors) {
14+
valueVector->resetAuxiliaryBuffer();
15+
}
16+
}
17+
1218
} // namespace common
1319
} // namespace kuzu

src/include/binder/binder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ class Binder {
113113
catalog::TableSchema* tableSchema);
114114
expression_vector bindCopyNodeColumns(
115115
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
116-
expression_vector bindCopyRelColumns(catalog::TableSchema* tableSchema);
116+
expression_vector bindCopyRelColumns(
117+
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
117118
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Statement& statement);
118119
std::unique_ptr<common::CSVReaderConfig> bindParsingOptions(
119120
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>&

src/include/binder/copy/bound_copy_from.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@ struct BoundCopyFromInfo {
1919
// `boundOffsetExpression` and `nbrOffsetExpression` are for rel tables only.
2020
std::shared_ptr<Expression> boundOffsetExpression;
2121
std::shared_ptr<Expression> nbrOffsetExpression;
22+
std::shared_ptr<Expression> predicateOffsetExpression;
2223

2324
bool containsSerial;
2425

2526
BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
2627
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
2728
std::shared_ptr<Expression> offsetExpression,
2829
std::shared_ptr<Expression> boundOffsetExpression,
29-
std::shared_ptr<Expression> nbrOffsetExpression, bool containsSerial)
30-
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
31-
columnExpressions)},
32-
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
33-
boundOffsetExpression)},
34-
nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {}
30+
std::shared_ptr<Expression> nbrOffsetExpression,
31+
std::shared_ptr<Expression> predicateOffsetExpression, bool containsSerial)
32+
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema},
33+
columnExpressions{std::move(columnExpressions)}, offsetExpression{std::move(
34+
offsetExpression)},
35+
boundOffsetExpression{std::move(boundOffsetExpression)}, nbrOffsetExpression{std::move(
36+
nbrOffsetExpression)},
37+
predicateOffsetExpression{std::move(predicateOffsetExpression)}, containsSerial{
38+
containsSerial} {}
3539

3640
std::unique_ptr<BoundCopyFromInfo> copy();
3741
};

src/include/catalog/property.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class Property {
4040
static constexpr std::string_view OFFSET_NAME = "_OFFSET_";
4141
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
4242
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";
43+
static constexpr std::string_view REL_PREDICATE_OFFSET_NAME = "_PREDICATE_OFFSET_";
4344

4445
Property(std::string name, std::unique_ptr<common::LogicalType> dataType)
4546
: Property{std::move(name), std::move(dataType), common::INVALID_PROPERTY_ID,

0 commit comments

Comments
 (0)