Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream simulation results in their own thread #1630

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package gov.nasa.jpl.aerie.merlin.worker.postgres;

import gov.nasa.jpl.aerie.json.JsonParser;
import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfile;
import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles;
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import gov.nasa.jpl.aerie.merlin.protocol.types.RealDynamics;
import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedInsertException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedUpdateException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements;
import org.apache.commons.lang3.tuple.Pair;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;

import static gov.nasa.jpl.aerie.merlin.driver.json.SerializedValueJsonParser.serializedValueP;
import static gov.nasa.jpl.aerie.merlin.server.http.ProfileParsers.realDynamicsP;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.discreteProfileTypeP;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP;

/**
* Utility class to handle upload of resource profiles to the database. This class allows for each upload to have its
* own Connection object.
Comment on lines +28 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nonblocking Suggestion: remove the second part of this class comment, since "each upload having its own connection object" isn't an inherit property of the class (since the same instance can work for multiple uploads):

Suggested change
* Utility class to handle upload of resource profiles to the database. This class allows for each upload to have its
* own Connection object.
* Utility class to handle upload of resource profiles to the database.

* */
public class PostgresProfileQueryHandler implements AutoCloseable {
private final Connection connection;
private final HashMap<String, Integer> profileIds;
private final HashMap<String, Duration> profileDurations;

private PreparedStatement postProfileStatement;
private PreparedStatement postSegmentsStatement;
private PreparedStatement updateDurationStatement;
Comment on lines +36 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If prepareStatements was inlined into the constructor, these could be final.


private long datasetId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be final


public PostgresProfileQueryHandler(DataSource dataSource, long datasetId) throws SQLException {
this.connection = dataSource.getConnection();
profileIds = new HashMap<>();
profileDurations = new HashMap<>();
this.datasetId = datasetId;
prepareStatements();
}

public void prepareStatements() throws SQLException {
final String postProfilesSql =
//language=sql
"""
insert into merlin.profile (dataset_id, name, type, duration)
values (%d, ?, ?::jsonb, ?::interval)
on conflict (dataset_id, name) do nothing
""".formatted(datasetId);
final String postSegmentsSql =
//language=sql
"""
insert into merlin.profile_segment (dataset_id, profile_id, start_offset, dynamics, is_gap)
values (%d, ?, ?::interval, ?::jsonb, false)
""".formatted(datasetId);
final String updateDurationSql =
//language=SQL
"""
update merlin.profile
set duration = ?::interval
where (dataset_id, id) = (%d, ?);
""".formatted(datasetId);

postProfileStatement = connection.prepareStatement(postProfilesSql, PreparedStatement.RETURN_GENERATED_KEYS);
postSegmentsStatement = connection.prepareStatement(postSegmentsSql, PreparedStatement.NO_GENERATED_KEYS);
updateDurationStatement = connection.prepareStatement(updateDurationSql, PreparedStatement.NO_GENERATED_KEYS);
}

/**
* Upload profiles, profile segments, and corresponding profile durations to the database.
* */
public void uploadResourceProfiles(final ResourceProfiles resourceProfiles) {
try {
// Add new profiles to DB
for (final var realEntry : resourceProfiles.realProfiles().entrySet()) {
if (!profileIds.containsKey(realEntry.getKey())) {
addRealProfileToBatch(realEntry.getKey(), realEntry.getValue());
}
}
for (final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) {
if (!profileIds.containsKey(discreteEntry.getKey())) {
addDiscreteProfileToBatch(discreteEntry.getKey(), discreteEntry.getValue());
}
}
postProfiles();

// Post Segments
for (final var realEntry : resourceProfiles.realProfiles().entrySet()) {
addProfileSegmentsToBatch(realEntry.getKey(), realEntry.getValue(), realDynamicsP);
}
for (final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) {
addProfileSegmentsToBatch(discreteEntry.getKey(), discreteEntry.getValue(), serializedValueP);
}

postProfileSegments();
updateProfileDurations();
} catch (SQLException ex) {
throw new DatabaseException("Exception occurred while posting profiles.", ex);
}
}

private void addRealProfileToBatch(final String name, ResourceProfile<RealDynamics> profile) throws SQLException {
postProfileStatement.setString(1, name);
postProfileStatement.setString(2, realProfileTypeP.unparse(Pair.of("real", profile.schema())).toString());
PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO);

postProfileStatement.addBatch();

profileDurations.put(name, Duration.ZERO);
}

private void addDiscreteProfileToBatch(final String name, ResourceProfile<SerializedValue> profile) throws SQLException {
postProfileStatement.setString(1, name);
postProfileStatement.setString(2, discreteProfileTypeP.unparse(Pair.of("discrete", profile.schema())).toString());
PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO);

postProfileStatement.addBatch();

profileDurations.put(name, Duration.ZERO);
}

/**
* Insert the batched profiles and cache their ids for future use.
*
* This method takes advantage of the fact that we're using the Postgres JDBC,
* which returns all columns when executing batches with `getGeneratedKeys`.
*/
private void postProfiles() throws SQLException {
final var results = this.postProfileStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment");
}

final var resultSet = this.postProfileStatement.getGeneratedKeys();
while(resultSet.next()){
profileIds.put(resultSet.getString("name"), resultSet.getInt("id"));
}
}

private void postProfileSegments() throws SQLException {
final var results = this.postSegmentsStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment");
}
}

private void updateProfileDurations() throws SQLException {
final var results = this.updateDurationStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedUpdateException("merlin.profile");
}
}

private <T> void addProfileSegmentsToBatch(final String name, ResourceProfile<T> profile, JsonParser<T> dynamicsP) throws SQLException {
final var id = profileIds.get(name);
this.postSegmentsStatement.setLong(1, id);

var newDuration = profileDurations.get(name);
for (final var segment : profile.segments()) {
PreparedStatements.setDuration(this.postSegmentsStatement, 2, newDuration);
final var dynamics = dynamicsP.unparse(segment.dynamics()).toString();
this.postSegmentsStatement.setString(3, dynamics);
this.postSegmentsStatement.addBatch();

newDuration = newDuration.plus(segment.extent());
}

this.updateDurationStatement.setLong(2, id);
PreparedStatements.setDuration(this.updateDurationStatement, 1, newDuration);
this.updateDurationStatement.addBatch();

profileDurations.put(name, newDuration);
}

@Override
public void close() throws SQLException {
this.postProfileStatement.close();
this.postSegmentsStatement.close();
this.updateDurationStatement.close();
this.connection.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,180 +1,37 @@
package gov.nasa.jpl.aerie.merlin.worker.postgres;

import gov.nasa.jpl.aerie.json.JsonParser;
import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfile;
import gov.nasa.jpl.aerie.merlin.driver.resources.ResourceProfiles;
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import gov.nasa.jpl.aerie.merlin.protocol.types.RealDynamics;
import gov.nasa.jpl.aerie.merlin.protocol.types.SerializedValue;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.DatabaseException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedInsertException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.FailedUpdateException;
import gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PreparedStatements;
import org.apache.commons.lang3.tuple.Pair;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.function.Consumer;

import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.discreteProfileTypeP;
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP;

import static gov.nasa.jpl.aerie.merlin.driver.json.SerializedValueJsonParser.serializedValueP;
import static gov.nasa.jpl.aerie.merlin.server.http.ProfileParsers.realDynamicsP;

public class PostgresProfileStreamer implements Consumer<ResourceProfiles>, AutoCloseable {
private final Connection connection;
private final HashMap<String, Integer> profileIds;
private final HashMap<String, Duration> profileDurations;

private final PreparedStatement postProfileStatement;
private final PreparedStatement postSegmentsStatement;
private final PreparedStatement updateDurationStatement;
private final DataSource dataSource;
private long datasetId;
private PostgresQueryQueue queryQueue;
Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be final


public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException {
this.connection = dataSource.getConnection();
profileIds = new HashMap<>();
profileDurations = new HashMap<>();

final String postProfilesSql =
//language=sql
"""
insert into merlin.profile (dataset_id, name, type, duration)
values (%d, ?, ?::jsonb, ?::interval)
on conflict (dataset_id, name) do nothing
""".formatted(datasetId);
final String postSegmentsSql =
//language=sql
"""
insert into merlin.profile_segment (dataset_id, profile_id, start_offset, dynamics, is_gap)
values (%d, ?, ?::interval, ?::jsonb, false)
""".formatted(datasetId);
final String updateDurationSql =
//language=SQL
"""
update merlin.profile
set duration = ?::interval
where (dataset_id, id) = (%d, ?);
""".formatted(datasetId);

postProfileStatement = connection.prepareStatement(postProfilesSql, PreparedStatement.RETURN_GENERATED_KEYS);
postSegmentsStatement = connection.prepareStatement(postSegmentsSql, PreparedStatement.NO_GENERATED_KEYS);
updateDurationStatement = connection.prepareStatement(updateDurationSql, PreparedStatement.NO_GENERATED_KEYS);
this.dataSource = dataSource;
this.datasetId = datasetId;
this.queryQueue = new PostgresQueryQueue();
}

@Override
public void accept(final ResourceProfiles resourceProfiles) {
try {
// Add new profiles to DB
for(final var realEntry : resourceProfiles.realProfiles().entrySet()){
if(!profileIds.containsKey(realEntry.getKey())){
addRealProfileToBatch(realEntry.getKey(), realEntry.getValue());
}
}
for(final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) {
if(!profileIds.containsKey(discreteEntry.getKey())){
addDiscreteProfileToBatch(discreteEntry.getKey(), discreteEntry.getValue());
}
}
postProfiles();

// Post Segments
for(final var realEntry : resourceProfiles.realProfiles().entrySet()){
addProfileSegmentsToBatch(realEntry.getKey(), realEntry.getValue(), realDynamicsP);
queryQueue.addToQueue(() -> {
try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make a new Query Handler per-transaction instead of having a single handler instance that is close in the close method? Asking as I'm concerned about running out of DB connections if the queue gets backed up, since the constructor for this class opens a new one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Creating a single handler instance caused me to run into an PSQLException: This statement has been closed error. I believe this was due to the HikariCP lifecycle auto-closing the connection. The good news is that each PostgresProfileQueryHandler will be created as part of the queued task, so it won't be created until the previous one is completed. So we should only have one DB connection at a time.

That said, I'm not sure of the overhead of creating "new" connections. I assumed HikariCP maintaining a connection pool meant that the overhead was low, but I could be wrong. There's likely a way of maintaining a single handler if we think the overhead is non-negligible.

Copy link
Contributor

@Mythicaeda Mythicaeda Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hikari does not auto-close the connection, otherwise we would have run into this issue with the develop implementation, alongside several other Hikari DB connections that have long lifespans with low activity.

Let me amend this: if Hikari auto-closes a connection, it recreates one to refill the pool.

Copy link
Contributor

@Mythicaeda Mythicaeda Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally this statement has been closed isn't about the connection, it's about a Statement object, which has a lifespan independent of the connection object. Basically, when you use the connection to make a new Statement, the prior statement (and corresponding ResultsSet) is closed, and trying to access it gives the error you received.

transaction.uploadResourceProfiles(resourceProfiles);
} catch (SQLException e) {
throw new DatabaseException("Exception occurred while posting profiles.", e);
}
for(final var discreteEntry : resourceProfiles.discreteProfiles().entrySet()) {
addProfileSegmentsToBatch(discreteEntry.getKey(), discreteEntry.getValue(), serializedValueP);
}

postProfileSegments();
updateProfileDurations();
} catch (SQLException ex) {
throw new DatabaseException("Exception occurred while posting profiles.", ex);
}
}

private void addRealProfileToBatch(final String name, ResourceProfile<RealDynamics> profile) throws SQLException {
postProfileStatement.setString(1, name);
postProfileStatement.setString(2, realProfileTypeP.unparse(Pair.of("real", profile.schema())).toString());
PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO);

postProfileStatement.addBatch();

profileDurations.put(name, Duration.ZERO);
}

private void addDiscreteProfileToBatch(final String name, ResourceProfile<SerializedValue> profile) throws SQLException {
postProfileStatement.setString(1, name);
postProfileStatement.setString(2, discreteProfileTypeP.unparse(Pair.of("discrete", profile.schema())).toString());
PreparedStatements.setDuration(this.postProfileStatement, 3, Duration.ZERO);

postProfileStatement.addBatch();

profileDurations.put(name, Duration.ZERO);
}

/**
* Insert the batched profiles and cache their ids for future use.
*
* This method takes advantage of the fact that we're using the Postgres JDBC,
* which returns all columns when executing batches with `getGeneratedKeys`.
*/
private void postProfiles() throws SQLException {
final var results = this.postProfileStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment");
}

final var resultSet = this.postProfileStatement.getGeneratedKeys();
while(resultSet.next()){
profileIds.put(resultSet.getString("name"), resultSet.getInt("id"));
}
}

private void postProfileSegments() throws SQLException {
final var results = this.postSegmentsStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedInsertException("merlin.profile_segment");
}
}

private void updateProfileDurations() throws SQLException {
final var results = this.updateDurationStatement.executeBatch();
for (final var result : results) {
if (result == Statement.EXECUTE_FAILED) throw new FailedUpdateException("merlin.profile");
}
}

private <T> void addProfileSegmentsToBatch(final String name, ResourceProfile<T> profile, JsonParser<T> dynamicsP) throws SQLException {
final var id = profileIds.get(name);
this.postSegmentsStatement.setLong(1, id);

var newDuration = profileDurations.get(name);
for (final var segment : profile.segments()) {
PreparedStatements.setDuration(this.postSegmentsStatement, 2, newDuration);
final var dynamics = dynamicsP.unparse(segment.dynamics()).toString();
this.postSegmentsStatement.setString(3, dynamics);
this.postSegmentsStatement.addBatch();

newDuration = newDuration.plus(segment.extent());
}

this.updateDurationStatement.setLong(2, id);
PreparedStatements.setDuration(this.updateDurationStatement, 1, newDuration);
this.updateDurationStatement.addBatch();

profileDurations.put(name, newDuration);
});
}

@Override
public void close() throws SQLException {
this.postProfileStatement.close();
this.postSegmentsStatement.close();
this.updateDurationStatement.close();
this.connection.close();
public void close() {
queryQueue.shutdown();
}

}
Loading
Loading