Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event store #124

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,12 @@
target/
dependency-reduced-pom.xml
pom.xml.versionsBackup

# ide
.classpath
.factorypath
.project
.settings

# emacs
*~
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ The following parameters might be needed in addition to the configuration option
| --spi-cassandra-connection-default-password | Password |
| --spi-cassandra-connection-default-keyspace | Keyspace-name (will be generated by the extension if it does not exist at startup-time) |
| --spi-cassandra-connection-default-replication-factor | Replication factor used if the extension creates the keyspace with simple strategy |
| --spi-event-store-provider=cassandra-event-store | Select the `CassandraEventStoreProvider` |
| --spi-event-store-cassandra-event-store-enabled=true | Enables the `CassandraEventStoreProvider` |

## Deviations from standard storage providers

Expand All @@ -63,6 +65,20 @@ This extension supports additional checks to prevent setting username to a value

To enable these checks for a realm, set its attribute `enableCheckForDuplicatesAcrossUsernameAndEmail` to `true` (default when not set: `false`)

### EventStoreProvider

#### Admin event `resourcePath` LIKE

Because `LIKE` queries require secondary indexes to be enabled in Cassandra, they are not currently supported for querying `resourcePath` of admin events.

#### Query performance

Performance of event queries using sparse fields (i.e. those generated by the Admin UI) are not performant, as Cassandra is forced to use `ALLOW FILTERING`. Future work could include a separate lookup table with client-side result filtering.

#### Pagination performance

Because Cassandra does not support OFFSET/LIMIT type queries, this is done in Java. This will likely consume significant resources for large event result sets.

## Development

### Private image registries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import de.arbeitsagentur.opdt.keycloak.cassandra.authSession.persistence.AuthSessionRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.client.persistence.ClientRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.ClientScopeRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.GroupRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.loginFailure.persistence.LoginFailureRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.realm.persistence.RealmRepository;
Expand All @@ -27,5 +28,5 @@
import de.arbeitsagentur.opdt.keycloak.cassandra.user.persistence.UserRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.userSession.persistence.UserSessionRepository;

public interface CompositeRepository extends GroupRepository, RoleRepository, UserRepository, RealmRepository, UserSessionRepository, AuthSessionRepository, LoginFailureRepository, SingleUseObjectRepository, ClientRepository, ClientScopeRepository {
public interface CompositeRepository extends GroupRepository, RoleRepository, UserRepository, RealmRepository, UserSessionRepository, AuthSessionRepository, LoginFailureRepository, SingleUseObjectRepository, ClientRepository, ClientScopeRepository, EventRepository {
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import de.arbeitsagentur.opdt.keycloak.cassandra.client.persistence.entities.Client;
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.ClientScopeRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.entities.ClientScopes;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.entities.AdminEventEntity;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.entities.EventEntity;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.GroupRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.entities.Groups;
import de.arbeitsagentur.opdt.keycloak.cassandra.loginFailure.persistence.LoginFailureRepository;
Expand All @@ -45,6 +48,8 @@
import de.arbeitsagentur.opdt.keycloak.cassandra.userSession.persistence.entities.UserSessionToAttributeMapping;
import lombok.Setter;
import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.events.EventQuery;
import org.keycloak.events.admin.AdminEventQuery;

import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -74,6 +79,8 @@ public class ManagedCompositeCassandraRepository implements CompositeRepository

private ClientScopeRepository clientScopeRepository;

private EventRepository eventRepository;

public Stream<User> findAllUsers() {
return this.userRepository.findAllUsers();
}
Expand Down Expand Up @@ -577,4 +584,37 @@ public Groups getGroupsByRealmId(String realmId) {
public void deleteRealmGroups(String realmId) {
this.groupRepository.deleteRealmGroups(realmId);
}

public void insertEvent(EventEntity event) {
this.eventRepository.insertEvent(event);
}

public void insertAdminEvent(AdminEventEntity adminEvent) {
this.eventRepository.insertAdminEvent(adminEvent);
}

public void deleteRealmEvents(String realmId) {
this.eventRepository.deleteRealmEvents(realmId);
}

public void deleteRealmEvents(String realmId, long olderThan) {
this.eventRepository.deleteRealmEvents(realmId, olderThan);
}

public void deleteAdminRealmEvents(String realmId) {
this.eventRepository.deleteAdminRealmEvents(realmId);
}

public void deleteAdminRealmEvents(String realmId, long olderThan) {
this.eventRepository.deleteAdminRealmEvents(realmId, olderThan);
}

public EventQuery eventQuery() {
return this.eventRepository.eventQuery();
}

public AdminEventQuery adminEventQuery() {
return this.eventRepository.adminEventQuery();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.datastax.oss.driver.internal.core.type.codec.extras.enums.EnumNameCodec;
Expand All @@ -39,6 +42,10 @@
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.ClientScopeMapperBuilder;
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.ClientScopeRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.clientScope.persistence.entities.ClientScopeValue;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.CassandraEventRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventMapper;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventMapperBuilder;
import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.CassandraGroupRepository;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.GroupMapper;
import de.arbeitsagentur.opdt.keycloak.cassandra.group.persistence.GroupMapperBuilder;
Expand Down Expand Up @@ -85,6 +92,7 @@

import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -164,9 +172,20 @@ public void init(Config.Scope scope) {
repository = createRepository(cqlSession);
}

public CqlSessionBuilder configure(CqlSessionBuilder cqlSessionBuilder) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done with external file based configuration and not hardcoded

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @opdt I was having problems getting the migration to run in the tests without a timeout. Can you point me to some documentation/examples of how this is achieved using external file based configuration? I'm not super familiar with cassandra, so the options are new to me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been struggling to figure out how to load an external file based configuration @opdt. Is there an example you're aware of?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally figured out how to load config from a file. I have a branch here, so let me know if you want me to do a separate PR. https://github.com/opdt/keycloak-cassandra-extension/compare/main...xgp:keycloak-cassandra-extension:xgp/config-file?expand=1

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need any custom code to load the config from a file. The cassandra driver does it automatically: https://docs.datastax.com/en/developer/java-driver/4.17/manual/core/configuration/ (application.conf in Classpath)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. But if you want to allow it to be put somewhere else and not bundled with the jar, then you have to tell Cassandra where to load it from.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, I might want to have k8s mount the conf file in the pod and then need to tell the extension where to load it from. This decouples the configuration from the library.

Copy link
Owner

@opdt opdt Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats possible via -Dconfig.file=/path/to/application.confvia the underyling Lightbend configuration library (which also supports loading from other resources/URLs).
You can pass additional Java arguments via JAVA_OPTS_APPEND environment variable in Keycloak.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I encounter when I specify that is that the init method still runs. So even though the underlying library supports the -Dconfig.file override, it appears that it never gets there because of the exceptions thrown when the variables are missing.

Also, if I set both, the variables in the config scope are what gets used.

log.info("Configuring CqlSession Builder");
return cqlSessionBuilder
.withConfigLoader(DriverConfigLoader.programmaticBuilder()
// Resolves the timeout query for create table timed out after PT2S
.withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(60000))
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(60000))
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(15000))
.build());
}

private void createDbIfNotExists(List<InetSocketAddress> contactPointsList, String username, String password, String localDatacenter, String keyspace, int replicationFactor) {
try (CqlSession createKeyspaceSession =
CqlSession.builder()
CqlSession.builder()
.addContactPoints(contactPointsList)
.withAuthCredentials(username, password)
.withLocalDatacenter(localDatacenter)
Expand All @@ -176,7 +195,7 @@ private void createDbIfNotExists(List<InetSocketAddress> contactPointsList, Stri

log.info("Create schema...");
try (CqlSession createKeyspaceSession =
CqlSession.builder()
configure(CqlSession.builder())
.addContactPoints(contactPointsList)
.withAuthCredentials(username, password)
.withLocalDatacenter(localDatacenter)
Expand Down Expand Up @@ -265,6 +284,10 @@ private CompositeRepository createRepository(CqlSession cqlSession) {
.build();
ClientScopeRepository clientScopeRepository = new CassandraClientScopeRepository(clientScopeMapper.clientScopeDao());

EventMapper eventMapper = new EventMapperBuilder(cqlSession).withSchemaValidationEnabled(false)
.build();
EventRepository eventRepository = new CassandraEventRepository(eventMapper.eventDao());

ManagedCompositeCassandraRepository cassandraRepository = new ManagedCompositeCassandraRepository();
cassandraRepository.setRoleRepository(roleRepository);
cassandraRepository.setGroupRepository(groupRepository);
Expand All @@ -276,6 +299,7 @@ private CompositeRepository createRepository(CqlSession cqlSession) {
cassandraRepository.setSingleUseObjectRepository(singleUseObjectRepository);
cassandraRepository.setClientRepository(clientRepository);
cassandraRepository.setClientScopeRepository(clientScopeRepository);
cassandraRepository.setEventRepository(eventRepository);

return cassandraRepository;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2024 Phase Two, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.arbeitsagentur.opdt.keycloak.cassandra.event;

import static de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.Converters.*;

import de.arbeitsagentur.opdt.keycloak.cassandra.event.persistence.EventRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.jbosslog.JBossLog;
import org.keycloak.events.Event;
import org.keycloak.events.EventQuery;
import org.keycloak.events.EventStoreProvider;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.events.admin.AdminEventQuery;
import org.keycloak.models.RealmModel;

@JBossLog
@RequiredArgsConstructor
public class CassandraEventStoreProvider implements EventStoreProvider {

private final EventRepository repository;

@Override
public void onEvent(Event event) {
repository.insertEvent(eventToEntity(event));
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
repository.insertAdminEvent(adminEventToEntity(event, includeRepresentation));
}

@Override
public EventQuery createQuery() {
return repository.eventQuery();
}

@Override
public AdminEventQuery createAdminQuery() {
return repository.adminEventQuery();
}

@Override
public void clear() {
throw new UnsupportedOperationException(
"clear() is deprecated and not supported in this implementation");
}

@Override
public void clear(RealmModel realm) {
repository.deleteRealmEvents(realm.getId());
}

@Override
public void clear(RealmModel realm, long olderThan) {
repository.deleteRealmEvents(realm.getId(), olderThan);
}

@Override
public void clearExpiredEvents() {
throw new UnsupportedOperationException(
"clearExpiredEvents() is deprecated and not supported in this implementation");
}

@Override
public void clearAdmin() {
throw new UnsupportedOperationException(
"clearAdmin() is deprecated and not supported in this implementation");
}

@Override
public void clearAdmin(RealmModel realm) {
repository.deleteAdminRealmEvents(realm.getId());
}

@Override
public void clearAdmin(RealmModel realm, long olderThan) {
repository.deleteAdminRealmEvents(realm.getId(), olderThan);
}

@Override
public void close() {
// Nothing to do
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2024 Phase Two, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.arbeitsagentur.opdt.keycloak.cassandra.event;

import static de.arbeitsagentur.opdt.keycloak.common.CommunityProfiles.isCassandraCacheProfileEnabled;
import static de.arbeitsagentur.opdt.keycloak.common.CommunityProfiles.isCassandraProfileEnabled;
import static de.arbeitsagentur.opdt.keycloak.common.ProviderHelpers.createProviderCached;
import static org.keycloak.userprofile.DeclarativeUserProfileProvider.PROVIDER_PRIORITY;

import com.google.auto.service.AutoService;
import de.arbeitsagentur.opdt.keycloak.cassandra.connection.CassandraConnectionProvider;
import org.keycloak.Config;
import org.keycloak.events.EventStoreProviderFactory;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.EnvironmentDependentProviderFactory;

@AutoService(EventStoreProviderFactory.class)
public class CassandraEventStoreProviderFactory
implements EventStoreProviderFactory, EnvironmentDependentProviderFactory {

@Override
public CassandraEventStoreProvider create(KeycloakSession session) {
CassandraConnectionProvider cassandraConnectionProvider =
createProviderCached(session, CassandraConnectionProvider.class);
return new CassandraEventStoreProvider(cassandraConnectionProvider.getRepository());
}

@Override
public void init(Config.Scope config) {}

@Override
public void postInit(KeycloakSessionFactory factory) {}

@Override
public void close() {}

@Override
public String getId() {
return "cassandra";
}

@Override
public int order() {
return PROVIDER_PRIORITY + 1;
}

@Override
public boolean isSupported() {
return isCassandraProfileEnabled() || isCassandraCacheProfileEnabled();
}
}
Loading