Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IKC-412 Apply data masking policies to topic messages #395

Open
wants to merge 4 commits into
base: IKC-410-data-masking-policy-form
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.consdata.kouncil.datamasking;

public class DataMaskingExcpetion extends RuntimeException {

public DataMaskingExcpetion(Exception e) {
super(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.consdata.kouncil.datamasking;

import com.consdata.kouncil.clusters.ClusterRepository;
import com.consdata.kouncil.model.cluster.Cluster;
import com.consdata.kouncil.model.datamasking.Policy;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class DataMaskingService {

private final PolicyRepository repository;
private final ClusterRepository clusterRepository;

public String maskTopicMessage(String message, String topic, String clusterId) {
List<Policy> list = getPoliciesForClusterAndTopic(topic, clusterId);

for (Policy policy : list) {
message = PolicyApplier.apply(policy, message);
}
return message;
}

private List<Policy> getPoliciesForClusterAndTopic(String topic, String clusterId) {
Map<Long, Cluster> clusters = StreamSupport.stream(clusterRepository.findAll().spliterator(), false)
.collect(Collectors.toMap(Cluster::getId, cluster -> cluster));
Set<Policy> policies = StreamSupport.stream(repository.findAll().spliterator(), false)
.collect(Collectors.toSet());

return policies.stream()
.filter(policy -> policy.getResources().stream().anyMatch(resource -> clusters.get(resource.getCluster()).getName().equals(clusterId)))
.filter(policy -> policy.getResources().stream().anyMatch(resource -> {
Pattern pattern = Pattern.compile(resource.getTopic());
return pattern.matcher(topic).matches();
}))
.toList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.consdata.kouncil.datamasking;

import com.consdata.kouncil.model.datamasking.MaskingType;
import com.consdata.kouncil.model.datamasking.Policy;
import com.consdata.kouncil.model.datamasking.PolicyField;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PolicyApplier {

private static final int MASKING_SIGNS_AMOUNT = 5;
private static final String MASKING_SIGN = "*";
private static final String FIELD_SEPARATOR = "\\.";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

static {
OBJECT_MAPPER.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
}

public static String apply(Policy policy, String value) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(new StringReader(value));
for (PolicyField field : policy.getFields()) {
String[] split = field.getField().split(FIELD_SEPARATOR);
traversFieldPathAndUpdateValue(field, jsonNode, split, 0);
}
return jsonNode.toString();
} catch (IOException e) {
throw new DataMaskingExcpetion(e);
}
}

private static void traversFieldPathAndUpdateValue(PolicyField policyField, JsonNode jsonNode, String[] path, int index) {
String fieldNameFromPath = path[0];
jsonNode.fieldNames().forEachRemaining(jsonChildFieldName -> processField(policyField, fieldNameFromPath, jsonChildFieldName, jsonNode, path, index));
}

private static void processField(PolicyField policyField, String fieldNameFromPath, String jsonChildFieldName, JsonNode jsonNode, String[] path,
int index) {
if (fieldNameFromPath.equals(jsonChildFieldName)) {
JsonNode childNode = jsonNode.get(fieldNameFromPath);
if (childNode.isObject()) {
processObjectNode(policyField, childNode, Arrays.copyOfRange(path, index + 1, path.length), index);
} else {
processValueNode(childNode, policyField, index, path, jsonNode, fieldNameFromPath);
}
}
}

private static void processObjectNode(PolicyField policyField, JsonNode childNode, String[] path, int index) {
traversFieldPathAndUpdateValue(policyField, childNode, path, index);
}

private static void processValueNode(JsonNode childNode, PolicyField policyField, int index, String[] path, JsonNode jsonNode, String fieldNameFromPath) {
if (childNode.isArray()) {
for (int i = 0; i < childNode.size(); i++) {
if (childNode.get(i).isValueNode()) {
((ArrayNode) childNode).set(i, maskFieldValue(policyField.getMaskingType(), childNode.get(i).asText()));
} else {
processObjectNode(policyField, childNode.get(i), Arrays.copyOfRange(path, index + 1, path.length), index);
}
}
} else {
((ObjectNode) jsonNode).put(fieldNameFromPath, maskFieldValue(policyField.getMaskingType(), childNode.asText()));
}
}

private static String maskFieldValue(MaskingType policyMaskingType, String fieldValue) {
String newFieldValue = fieldValue;
if (fieldValue.length() < MASKING_SIGNS_AMOUNT || MaskingType.ALL.equals(policyMaskingType)) {
newFieldValue = MASKING_SIGN.repeat(fieldValue.length());
} else if (MaskingType.FIRST_5.equals(policyMaskingType)) {
newFieldValue = MASKING_SIGN.repeat(MASKING_SIGNS_AMOUNT) + fieldValue.substring(MASKING_SIGNS_AMOUNT);
} else if (MaskingType.LAST_5.equals(policyMaskingType)) {
newFieldValue = fieldValue.substring(0, fieldValue.length() - MASKING_SIGNS_AMOUNT) + MASKING_SIGN.repeat(MASKING_SIGNS_AMOUNT);
}
return newFieldValue;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.consdata.kouncil.datamasking.dto;

import com.consdata.kouncil.model.datamasking.MaskingType;
import java.util.Set;
import lombok.Data;

Expand All @@ -9,7 +8,6 @@ public class PolicyDto {

private Long id;
private String name;
private MaskingType maskingType;
private Boolean applyToAllResources;
private Set<PolicyFieldDto> fields;
private Set<PolicyResourceDto> resources;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.consdata.kouncil.datamasking.dto;

import com.consdata.kouncil.model.datamasking.FieldFindRule;
import com.consdata.kouncil.model.datamasking.MaskingType;
import lombok.Data;

@Data
public class PolicyFieldDto {

private Long id;
private FieldFindRule findRule;
private MaskingType maskingType;
private String field;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public class Policy {
@Column(name = "NAME")
private String name;

@Column(name = "MASKING_TYPE")
@Enumerated(EnumType.STRING)
private MaskingType maskingType;

@OneToMany(cascade = {CascadeType.PERSIST, CascadeType.MERGE}, fetch = FetchType.EAGER, orphanRemoval = true)
@JoinColumn(name = "POLICY_ID")
private Set<PolicyField> fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class PolicyField {
@Column(name = "FIELD")
private String field;

@Column(name = "FIND_RULE")
@Column(name = "MASKING_TYPE")
@Enumerated(EnumType.STRING)
private FieldFindRule findRule;
private MaskingType maskingType;

@ManyToOne(cascade = {CascadeType.PERSIST, CascadeType.MERGE})
@JoinColumn(name = "POLICY_ID", insertable = false, updatable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.consdata.kouncil.KafkaConnectionService;
import com.consdata.kouncil.KouncilRuntimeException;
import com.consdata.kouncil.MessagesHelper;
import com.consdata.kouncil.datamasking.DataMaskingService;
import com.consdata.kouncil.serde.deserialization.DeserializationService;
import com.consdata.kouncil.serde.deserialization.DeserializedMessage;
import com.consdata.kouncil.serde.serialization.SerializationService;
Expand Down Expand Up @@ -58,20 +59,21 @@ public class TopicService {
private final KafkaConnectionService kafkaConnectionService;
private final SerializationService serializationService;
private final DeserializationService deserializationService;
private final DataMaskingService dataMaskingService;
private final MessagesHelper messagesHelper;
private static final int RESEND_MAX_POLL_RECORDS = 100;

@Value("${resendHeadersToKeep:}")
private String[] resendHeadersToKeep;

TopicMessagesDto getTopicMessages(@PathVariable("topicName") String topicName,
@PathVariable("partition") String partitions,
@RequestParam("page") String pageParam,
@RequestParam("limit") String limitParam,
@RequestParam(value = "beginningTimestampMillis", required = false) Long beginningTimestampMillis,
@RequestParam(value = "endTimestampMillis", required = false) Long endTimestampMillis,
@RequestParam(value = "offset", required = false) Long offset,
@RequestParam("serverId") String serverId) {
@PathVariable("partition") String partitions,
@RequestParam("page") String pageParam,
@RequestParam("limit") String limitParam,
@RequestParam(value = "beginningTimestampMillis", required = false) Long beginningTimestampMillis,
@RequestParam(value = "endTimestampMillis", required = false) Long endTimestampMillis,
@RequestParam(value = "offset", required = false) Long offset,
@RequestParam("serverId") String serverId) {
messagesHelper.validateTopics(serverId, singletonList(topicName));
int limit = Integer.parseInt(limitParam); // per partition!
long page = Long.parseLong(pageParam); // per partition!
Expand Down Expand Up @@ -191,12 +193,13 @@ private void pollMessages(String clusterId, int limit, KafkaConsumer<Bytes, Byte
DeserializedMessage deserializedMessage = deserializationService.deserialize(clusterId, consumerRecord);
if (messagesCount < limit) {
messagesCount += 1;

messages.add(TopicMessage
.builder()
.key(deserializedMessage.getKeyData().getDeserialized())
.key(dataMaskingService.maskTopicMessage(deserializedMessage.getKeyData().getDeserialized(), partition.topic(), clusterId))
.keyFormat(deserializedMessage.getKeyData().getMessageFormat())
.value(deserializedMessage.getValueData().getDeserialized())
.originalValue(deserializedMessage.getValueData().getOriginalValue())
.value(dataMaskingService.maskTopicMessage(deserializedMessage.getValueData().getDeserialized(), partition.topic(), clusterId))
.originalValue(dataMaskingService.maskTopicMessage(deserializedMessage.getValueData().getOriginalValue(), partition.topic(), clusterId))
.valueFormat(deserializedMessage.getValueData().getMessageFormat())
.offset(consumerRecord.offset())
.partition(consumerRecord.partition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ create table policy
(
id bigint not null primary key,
apply_to_all_resources boolean,
name varchar(255),
masking_type varchar(255)
name varchar(255)
);

create table policy_field
(
id bigint not null primary key,
field varchar(255),
find_rule varchar(255),
policy_id bigint
id bigint not null primary key,
field varchar(255),
masking_type varchar(255),
policy_id bigint
constraint fkppv2kuelp29jag142kqgvober references policy
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.consdata.kouncil.datamasking;

import static org.assertj.core.api.Assertions.assertThat;

import com.consdata.kouncil.model.datamasking.MaskingType;
import com.consdata.kouncil.model.datamasking.Policy;
import com.consdata.kouncil.model.datamasking.PolicyField;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringReader;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Objects;
import org.junit.jupiter.api.Test;

class PolicyApplierTest {

private final ObjectMapper objectMapper = new ObjectMapper();

@Test
void shouldApplyPoliciesToSimpleMessage() throws URISyntaxException, IOException {
String message = loadFile("data-masking/input/simple_object_test.json");

Policy policy = new Policy();
policy.setFields(new HashSet<>());
policy.getFields().add(createField(MaskingType.LAST_5, "firstName"));
policy.getFields().add(createField(MaskingType.FIRST_5, "lastName"));
String apply = PolicyApplier.apply(policy, message);


String result = objectMapper.readTree(new StringReader(loadFile("data-masking/output/simple_object_test_result.json"))).toString();
assertThat(apply).isEqualTo(result);
}

@Test
void shouldApplyPoliciesToComplexMessage() throws URISyntaxException, IOException {
String message = loadFile("data-masking/input/complex_object_test.json");

Policy policy = new Policy();
policy.setFields(new HashSet<>());
policy.getFields().add(createField(MaskingType.LAST_5, "firstName"));
policy.getFields().add(createField(MaskingType.FIRST_5, "lastName"));
policy.getFields().add(createField(MaskingType.ALL, "address.street"));
policy.getFields().add(createField(MaskingType.FIRST_5, "longestTravels"));
String apply = PolicyApplier.apply(policy, message);


String result = objectMapper.readTree(new StringReader(loadFile("data-masking/output/complex_object_test_result.json"))).toString();
assertThat(apply).isEqualTo(result);
}

@Test
void shouldApplyPoliciesToMessageWithArrays() throws URISyntaxException, IOException {
String message = loadFile("data-masking/input/object_with_arrays_test.json");

Policy policy = new Policy();
policy.setFields(new HashSet<>());
policy.getFields().add(createField(MaskingType.ALL, "firstName"));
policy.getFields().add(createField(MaskingType.FIRST_5, "lastName"));
policy.getFields().add(createField(MaskingType.FIRST_5, "salary"));
policy.getFields().add(createField(MaskingType.FIRST_5, "address.street"));
policy.getFields().add(createField(MaskingType.LAST_5, "address.street"));
policy.getFields().add(createField(MaskingType.LAST_5, "address.postalCodes"));
policy.getFields().add(createField(MaskingType.ALL, "hobbies"));
String apply = PolicyApplier.apply(policy, message);

String result = objectMapper.readTree(new StringReader(loadFile("data-masking/output/object_with_arrays_test_result.json"))).toString();
assertThat(apply).isEqualTo(result);
}

private String loadFile(String filePath) throws URISyntaxException, IOException {
return Files.readString(Paths.get(Objects.requireNonNull(PolicyApplierTest.class.getClassLoader().getResource(filePath)).toURI()));
}

private PolicyField createField(MaskingType maskingType, String field) {
PolicyField policyField = new PolicyField();
policyField.setMaskingType(maskingType);
policyField.setField(field);
return policyField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.consdata.kouncil.datamasking.dto.PolicyDto;
import com.consdata.kouncil.datamasking.dto.PolicyFieldDto;
import com.consdata.kouncil.datamasking.dto.PolicyResourceDto;
import com.consdata.kouncil.model.datamasking.MaskingType;
import com.consdata.kouncil.model.datamasking.Policy;
import java.util.HashSet;
import org.junit.jupiter.api.Test;
Expand All @@ -20,7 +19,6 @@ void should_convert_to_entity() {
policyDto.setId(1L);
policyDto.setName("test");
policyDto.setApplyToAllResources(false);
policyDto.setMaskingType(MaskingType.ALL);
policyDto.setFields(new HashSet<>());
policyDto.getFields().add(createField(1L));
policyDto.getFields().add(createField(2L));
Expand All @@ -35,7 +33,6 @@ void should_convert_to_entity() {
() -> assertThat(policy.getId()).isEqualTo(policyDto.getId()),
() -> assertThat(policy.getName()).isEqualTo(policyDto.getName()),
() -> assertThat(policy.getApplyToAllResources()).isEqualTo(policyDto.getApplyToAllResources()),
() -> assertThat(policy.getMaskingType()).isEqualTo(policyDto.getMaskingType()),
() -> assertThat(policy.getFields()).hasSize(policyDto.getFields().size()),
() -> assertThat(policy.getResources()).hasSize(policyDto.getResources().size())
);
Expand Down
Loading
Loading