From 1adc4ce180dd051be78a98d9e838113a81a84b7d Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:49:58 +0200 Subject: [PATCH 01/11] test: Fix wrong assert in PersistedEntryTest --- .../java/org/atomhopper/adapter/jpa/PersistedEntryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java index c5fd4947..83b1a955 100644 --- a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java +++ b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java @@ -67,7 +67,7 @@ public void shouldReturnDateLastUpdated() throws Exception { date = persistedEntry.getDateLastUpdated(); assertEquals("Getting the date last updated should return a date object.", persistedEntry.getDateLastUpdated(), date); persistedEntry.setDateLastUpdated(new Date()); - assertNotSame("Setting the date last updated should change last updated date.", persistedEntry.getDateLastUpdated().equals(date)); + assertNotSame("Setting the date last updated should change last updated date.", persistedEntry.getDateLastUpdated(), date); } @Test From 24c360632444adb27d47d3ef04c161d84d369562 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:50:57 +0200 Subject: [PATCH 02/11] test: Assert response code in FeedTagTest This makes it easier to debug issues that case 500 errors. --- .../src/test/java/org/atomhopper/FeedTagTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test-suite/src/test/java/org/atomhopper/FeedTagTest.java b/test-suite/src/test/java/org/atomhopper/FeedTagTest.java index 2bab0dd2..4166dcc6 100644 --- a/test-suite/src/test/java/org/atomhopper/FeedTagTest.java +++ b/test-suite/src/test/java/org/atomhopper/FeedTagTest.java @@ -5,6 +5,8 @@ import org.apache.abdera.model.Entry; import org.apache.abdera.model.Source; import org.apache.abdera.protocol.client.AbderaClient; +import org.apache.abdera.protocol.client.ClientResponse; +import org.apache.commons.httpclient.HttpStatus; import org.junit.Before; import org.junit.Test; import org.junit.experimental.runners.Enclosed; @@ -124,9 +126,11 @@ public void setUp() throws Exception { //entry.setUpdated(date); //This needs to be auto-generated. //report("The Entry to Post", entry.toString()); //Commented out to reduced build logs. - String postResponse = abderaClient.post("http://localhost:" + getPort() + "/namespace/feed/", entry).getDocument().getRoot().toString(); - doc = xml.toDOM(postResponse); - //report("The Created Entry", postResponse); //Commented out to reduced build logs. + ClientResponse postResponse = abderaClient.post("http://localhost:" + getPort() + "/namespace/feed/", entry); + String postResponseText = postResponse.getDocument().getRoot().toString(); + //report("The Created Entry", postResponseText); //Commented out to reduced build logs. + assertEquals(postResponse.getStatus(), HttpStatus.SC_CREATED); + doc = xml.toDOM(postResponseText); } @Test From c03a5d75b673c7b6465d5a5cf87ca2bcb7671167 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Wed, 5 Oct 2016 09:50:18 +0200 Subject: [PATCH 03/11] test: align AtomHopperJettyServerBuilder impls The implementation of AtomHopperJettyServerBuilder in test-suite did not have a constructor that takes config file location as argument, as the one in ah-jetty-server did. This change syncs the one in test-suite up with the one in ah-jetty-server. --- .../jetty/AtomHopperJettyServerBuilder.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/test-suite/src/main/java/org/atomhopper/jetty/AtomHopperJettyServerBuilder.java b/test-suite/src/main/java/org/atomhopper/jetty/AtomHopperJettyServerBuilder.java index 1bc203c7..db129053 100644 --- a/test-suite/src/main/java/org/atomhopper/jetty/AtomHopperJettyServerBuilder.java +++ b/test-suite/src/main/java/org/atomhopper/jetty/AtomHopperJettyServerBuilder.java @@ -9,18 +9,21 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.springframework.web.context.ContextLoaderListener; -/** - * - * - */ + public class AtomHopperJettyServerBuilder { private final int portNumber; + private String configurationPathAndFile = ""; public AtomHopperJettyServerBuilder(int portNumber) { this.portNumber = portNumber; } + public AtomHopperJettyServerBuilder(int portNumber, String configurationPathAndFile) { + this.portNumber = portNumber; + this.configurationPathAndFile = configurationPathAndFile; + } + private Server buildNewInstance() { final Server jettyServerReference = new Server(portNumber); final ServletContextHandler rootContext = buildRootContext(jettyServerReference); @@ -28,7 +31,11 @@ private Server buildNewInstance() { final ServletHolder atomHopServer = new ServletHolder(AtomHopperServlet.class); final ServletHolder versionServlet = new ServletHolder(AtomHopperVersionServlet.class); atomHopServer.setInitParameter(ServletInitParameter.CONTEXT_ADAPTER_CLASS.toString(), ServletSpringContext.class.getName()); - atomHopServer.setInitParameter(ServletInitParameter.CONFIGURATION_LOCATION.toString(), "classpath:/META-INF/atom-server.cfg.xml"); + if(configurationPathAndFile.length() <= 0) { + atomHopServer.setInitParameter(ServletInitParameter.CONFIGURATION_LOCATION.toString(), "classpath:/META-INF/atom-server.cfg.xml"); + } else { + atomHopServer.setInitParameter(ServletInitParameter.CONFIGURATION_LOCATION.toString(), configurationPathAndFile); + } rootContext.addServlet(versionServlet, "/buildinfo"); rootContext.addServlet(atomHopServer, "/*"); @@ -47,4 +54,4 @@ private ServletContextHandler buildRootContext(Server serverReference) { public Server newServer() { return buildNewInstance(); } -} +} \ No newline at end of file From d07242209b33eb01156b5271620a1f16ce3c03c2 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Tue, 27 Sep 2016 13:52:11 +0200 Subject: [PATCH 04/11] test: Programs for verifying following by marker MarkerRaceConditionTest posts multiple entries in parallel, while following the feed using the marker mechanism. The code is written to try to maximize the risk of having multiple entries being handled in parallel by the server. If any entry is written with an older timestamp than the latest, then that entry will be missed, and this will be revealed by the program. --- pom.xml | 9 +- .../java/org/atomhopper/FollowByMarker.java | 121 ++++++++++++++++ .../atomhopper/MarkerRaceConditionTest.java | 136 ++++++++++++++++++ .../java/org/atomhopper/PostInParallel.java | 122 ++++++++++++++++ .../test/resources/jetty-logging.properties | 2 + test-suite/src/test/resources/logback.xml | 15 ++ .../application-context-h2.xml | 39 +++++ .../marker-race-condition/atom-server.cfg.xml | 18 +++ 8 files changed, 461 insertions(+), 1 deletion(-) create mode 100644 test-suite/src/test/java/org/atomhopper/FollowByMarker.java create mode 100644 test-suite/src/test/java/org/atomhopper/MarkerRaceConditionTest.java create mode 100644 test-suite/src/test/java/org/atomhopper/PostInParallel.java create mode 100644 test-suite/src/test/resources/jetty-logging.properties create mode 100644 test-suite/src/test/resources/logback.xml create mode 100644 test-suite/src/test/resources/marker-race-condition/application-context-h2.xml create mode 100644 test-suite/src/test/resources/marker-race-condition/atom-server.cfg.xml diff --git a/pom.xml b/pom.xml index 557b4393..322f9449 100644 --- a/pom.xml +++ b/pom.xml @@ -248,7 +248,14 @@ 1.6.5 - + + + org.slf4j + slf4j-simple + 1.6.5 + + + commons-httpclient commons-httpclient 3.1 diff --git a/test-suite/src/test/java/org/atomhopper/FollowByMarker.java b/test-suite/src/test/java/org/atomhopper/FollowByMarker.java new file mode 100644 index 00000000..5a560cfc --- /dev/null +++ b/test-suite/src/test/java/org/atomhopper/FollowByMarker.java @@ -0,0 +1,121 @@ +package org.atomhopper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.abdera.Abdera; +import org.apache.abdera.i18n.iri.IRI; +import org.apache.abdera.model.Document; +import org.apache.abdera.model.Entry; +import org.apache.abdera.model.Feed; +import org.apache.abdera.protocol.Response; +import org.apache.abdera.protocol.client.AbderaClient; +import org.apache.abdera.protocol.client.ClientResponse; +import org.apache.abdera.protocol.client.RequestOptions; + +public class FollowByMarker implements Runnable { + private final String url; + private final String prefix; + private final Abdera abdera = new Abdera(); + private final AbderaClient client; + + + public static void main(String[] args) throws IOException { + final String url = args.length >= 1 ? args[0] : "http://localhost:8080/namespace/feed"; + final String prefix = args.length >= 2 ? args[1] : "fe3b98c9-d86a-4d93-afd5-c8d10c542882"; + + FollowByMarker followByMarker = new FollowByMarker(url, prefix); + followByMarker.run(); + } + + public FollowByMarker(String url, String prefix) { + this.url = url; + this.prefix = prefix; + client = new AbderaClient(abdera); + } + + boolean shouldRun; + Thread thread; + IRI marker = null; + List seen = new ArrayList(); + + public Thread start(String name) { + shouldRun = true; + thread = new Thread(this, name); + thread.start(); + return thread; + } + + public List stop() throws InterruptedException { + shouldRun = false; + thread.interrupt(); + thread.join(); + fetchOnce(); + return seen; + } + + public void run() { + + System.out.println("Looking for first entry with prefix " + prefix + " on " + url); + while (shouldRun && marker == null) { + System.out.println("GET " + url); + ClientResponse resp = client.get(url + "?limit=1000", new RequestOptions(true)); + if (resp.getType() == Response.ResponseType.SUCCESS) { + Document doc = resp.getDocument(); + System.out.println("Got " + doc.getRoot().getEntries().size() + " entries"); + if (!doc.getRoot().getEntries().isEmpty()) { + System.out.println("First title: " + doc.getRoot().getEntries().get(0).getTitle()); + } + for (Entry entry : doc.getRoot().getEntries()) { + if (!entry.getTitle().startsWith(prefix)) { + continue; + } + if (marker == null) { + marker = entry.getId(); + } + seen.add(entry.getTitle()); + } + } else { + System.out.println(resp.getType().name()); + } + } + + System.out.println("Starting at marker " + marker); + + long lastPrintout = System.nanoTime(); + + while (shouldRun) { + long now = System.nanoTime(); + if (now > lastPrintout + 1000000000) { + System.out.printf("%s: %d\n", new Date(), seen.size()); + lastPrintout = now; + } + + fetchOnce(); + } + } + + public void fetchOnce() { + String withMarker = url + "?marker=" + marker.toString(); + //System.out.println("Fetching " + withMarker); + ClientResponse resp = client.get(withMarker, new RequestOptions(true)); + if (resp.getType() == Response.ResponseType.SUCCESS) { + Document doc = resp.getDocument(); + final List entries = doc.getRoot().getEntries(); + if (entries.isEmpty()) { + return; + } + marker = entries.get(0).getId(); + for (Entry entry : entries) { + if (!entry.getTitle().startsWith(prefix)) { + continue; + } + seen.add(entry.getTitle()); + } + } else { + System.out.println(resp.getType().name()); + } + } +} diff --git a/test-suite/src/test/java/org/atomhopper/MarkerRaceConditionTest.java b/test-suite/src/test/java/org/atomhopper/MarkerRaceConditionTest.java new file mode 100644 index 00000000..3c013b9d --- /dev/null +++ b/test-suite/src/test/java/org/atomhopper/MarkerRaceConditionTest.java @@ -0,0 +1,136 @@ +package org.atomhopper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.atomhopper.servlet.ServletInitParameter; +import org.atomhopper.servlet.ServletSpringContext; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.springframework.web.context.ContextLoaderListener; + +public class MarkerRaceConditionTest { + public static void main(String[] args) throws Exception { + final int port = 8080; + final int numPosters = 20; + final int numEntries = 10000; + + String url = "http://localhost:" + port + "/namespace/feed"; + String prefix = UUID.randomUUID().toString(); + String config = getResource("marker-race-condition/atom-server.cfg.xml"); + String context = getResource("marker-race-condition/application-context-h2.xml"); + System.setProperty("org.jboss.logging.provider", "slf4j"); + + Server serverInstance = buildNewInstance(port, config, context); + serverInstance.setStopAtShutdown(true); + serverInstance.start(); + + ensureFeedExists(url); + + final FollowByMarker follower = new FollowByMarker(url, prefix); + follower.start("follower"); + + final AtomicInteger numPosted = new AtomicInteger(); + final AtomicInteger numFailed = new AtomicInteger(); + PostInParallel posters = new PostInParallel(url, prefix, numPosted, numFailed); + long postStart = System.nanoTime(); + posters.run(numPosters, numEntries); + long postEnd = System.nanoTime(); + + List seen = follower.stop(); + Set unique = new HashSet(seen); + List sorted = new ArrayList(seen); + Collections.sort(sorted); + if (unique.size() != seen.size()) { + System.out.printf("Found %d duplicates\n", unique.size() - seen.size()); + for (String s : unique) { + int start = Collections.binarySearch(sorted, s); + int pos = start; + while (pos < sorted.size() - 1 && sorted.get(pos+1).equals(s)) { + pos++; + } + if ((pos - start) > 1) { + System.out.printf("Found %d copies of %s\n", pos - start, s); + } + } + } + + serverInstance.stop(); + + System.out.printf("Posting entries took %f ms\n", (postEnd - postStart) / 1000000.0); + System.out.printf("num posted: %d num failed: %d, num seen: %d\n", numPosted.get(), numFailed.get(), unique.size()); + + for (int poster = 0; poster < numPosters; poster++) { + for (int entry = 0; entry < numEntries; entry++) { + String title = String.format("%s-%s-%s", prefix, entry, poster); + if (Collections.binarySearch(sorted, title) < 0) { + System.out.printf("Did not find %s\n", title); + } + } + } + + System.exit(numPosted.get() == unique.size() ? 0 : 1); + } + + private static String getResource(String resource) { + return MarkerRaceConditionTest.class.getClassLoader().getResource(resource).toString(); + } + + private static void ensureFeedExists(String url) { + boolean posted = false; + while (!posted) { + final PostMethod post = new PostMethod(url); + final String title = "[MarkerRaceConditionTest start]"; + final String body = "" + + "" + title + "" + + "

" + title + "

"; + try { + post.setRequestEntity(new StringRequestEntity(body, "application/atom+xml", "ascii")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + final HttpClient httpClient = new HttpClient(); + + try { + final int result = httpClient.executeMethod(post); + if (result != 201) { + continue; + } + posted = true; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private static Server buildNewInstance(int portNumber, String configurationPathAndFile, String applicationContext) { + final Server server = new Server(portNumber); + final ServletContextHandler servletContext = new ServletContextHandler(server, "/"); + servletContext.getInitParams().put("contextConfigLocation", applicationContext); + servletContext.addEventListener(new ContextLoaderListener()); + + final ServletHolder atomHopServer = new ServletHolder(AtomHopperServlet.class); + atomHopServer.setInitParameter( + ServletInitParameter.CONTEXT_ADAPTER_CLASS.toString(), ServletSpringContext.class.getName() + ); + atomHopServer.setInitParameter( + ServletInitParameter.CONFIGURATION_LOCATION.toString(), configurationPathAndFile + ); + + servletContext.addServlet(atomHopServer, "/*"); + + return server; + } +} diff --git a/test-suite/src/test/java/org/atomhopper/PostInParallel.java b/test-suite/src/test/java/org/atomhopper/PostInParallel.java new file mode 100644 index 00000000..995f391e --- /dev/null +++ b/test-suite/src/test/java/org/atomhopper/PostInParallel.java @@ -0,0 +1,122 @@ +package org.atomhopper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; + +public class PostInParallel { + private final String url; + private final String prefix; + private final AtomicInteger numPosted; + private final AtomicInteger numFailed; + + public static void main(String[] args) throws IOException, InterruptedException { + final String url = args.length >= 1 ? args[0] : "http://localhost:8080/namespace/feed"; + final String prefix = args.length >= 2 ? args[1] : UUID.randomUUID().toString(); + final AtomicInteger numPosted = new AtomicInteger(); + final AtomicInteger numFailed = new AtomicInteger(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + System.out.println("Total posted: " + numPosted.longValue()); + System.out.println("Total failed: " + numFailed.longValue()); + } + })); + + + final int numParallel = 10; + + new PostInParallel(url, prefix, numPosted, numFailed).run(numParallel, 10000); + } + + public PostInParallel(String url, String prefix, AtomicInteger numPosted, AtomicInteger numFailed) { + this.url = url; + this.prefix = prefix; + this.numPosted = numPosted; + this.numFailed = numFailed; + } + + public void run(int numParallel, int numBatches) throws InterruptedException { + final CyclicBarrier barrier = new CyclicBarrier(numParallel); + Thread threads[] = new Thread[numParallel]; + for (int i = 0; i < numParallel; i++) { + final int threadid = i; + threads[i] = new Thread(new PostEntries(threadid, barrier, numBatches), String.format("poster-%d", threadid)); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + } + + private class PostEntries implements Runnable { + private final int threadid; + private final CyclicBarrier barrier; + private final int numBatches; + + public PostEntries(int threadid, CyclicBarrier barrier, int numBatches) { + this.threadid = threadid; + this.barrier = barrier; + this.numBatches = numBatches; + } + + @Override + public void run() { + final HttpClient httpClient = new HttpClient(); + + for (int batch = 0; batch < numBatches; batch++) { + final String title = prefix + "-" + batch + "-" + threadid; + final String body = "" + + "" + title + "" + + ""; + + try { + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + + boolean posted = false; + while (!posted) { + final PostMethod post = new PostMethod(url); + try { + post.setRequestEntity(new StringRequestEntity(body, "application/atom+xml", "ascii")); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + try { + final int result = httpClient.executeMethod(post); + if (result != 201) { + numFailed.incrementAndGet(); + System.err.println("Failed to POST entry: " + body); + continue; + } + } catch (Exception e) { + numFailed.incrementAndGet(); + System.err.println("Error when trying to POST entry: " + e); + e.printStackTrace(); + } + posted = true; + } + int postnum = numPosted.incrementAndGet(); + if (postnum % 1000 == 0) { + System.out.printf("%s: %d\n", new Date(), postnum); + } + } + + } + } +} diff --git a/test-suite/src/test/resources/jetty-logging.properties b/test-suite/src/test/resources/jetty-logging.properties new file mode 100644 index 00000000..33d652a6 --- /dev/null +++ b/test-suite/src/test/resources/jetty-logging.properties @@ -0,0 +1,2 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StrErrLog +org.eclipse.jetty.LEVEL=INFO diff --git a/test-suite/src/test/resources/logback.xml b/test-suite/src/test/resources/logback.xml new file mode 100644 index 00000000..3f02f46e --- /dev/null +++ b/test-suite/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/test-suite/src/test/resources/marker-race-condition/application-context-h2.xml b/test-suite/src/test/resources/marker-race-condition/application-context-h2.xml new file mode 100644 index 00000000..79e7505d --- /dev/null +++ b/test-suite/src/test/resources/marker-race-condition/application-context-h2.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test-suite/src/test/resources/marker-race-condition/atom-server.cfg.xml b/test-suite/src/test/resources/marker-race-condition/atom-server.cfg.xml new file mode 100644 index 00000000..f304062d --- /dev/null +++ b/test-suite/src/test/resources/marker-race-condition/atom-server.cfg.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file From db207d9c36d23f3f15568ab788c1a0406364c2a6 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:39:30 +0200 Subject: [PATCH 05/11] hibernate: Use dateLastUpdated for pagination When using a marker for pagination, entries were selected by comparing their dateLastUpdated with the creation date of the marker, which causes wrong results when using an updated marker. --- .../org/atomhopper/hibernate/HibernateFeedRepository.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java index 460b2f97..0fd6af9b 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java @@ -162,13 +162,13 @@ public List perform(Session liveSession) { switch (direction) { case FORWARD: - criteria.add(Restrictions.gt(DATE_LAST_UPDATED, markerEntry.getCreationDate())).addOrder(Order.asc(DATE_LAST_UPDATED)); + criteria.add(Restrictions.gt(DATE_LAST_UPDATED, markerEntry.getDateLastUpdated())).addOrder(Order.asc(DATE_LAST_UPDATED)); feedPage.addAll(criteria.list()); Collections.reverse(feedPage); break; case BACKWARD: - criteria.add(Restrictions.le(DATE_LAST_UPDATED, markerEntry.getCreationDate())).addOrder(Order.desc(DATE_LAST_UPDATED)); + criteria.add(Restrictions.le(DATE_LAST_UPDATED, markerEntry.getDateLastUpdated())).addOrder(Order.desc(DATE_LAST_UPDATED)); feedPage.addAll(criteria.list()); break; } From b0a93f9fe4e85140d1318ca60c1435d1f9ec816c Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Wed, 17 Aug 2016 10:37:56 +0200 Subject: [PATCH 06/11] Use Java 8 This is required for high-resolution time stamps in Hibernate. --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 322f9449..21b8a936 100644 --- a/pom.xml +++ b/pom.xml @@ -407,8 +407,8 @@ 2.3.2 - 1.6 - 1.6 + 1.8 + 1.8 From 68552a61e3127aad9fafafba18528f85bfd9d4b2 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Wed, 17 Aug 2016 09:42:03 +0200 Subject: [PATCH 07/11] Use Hibernate 5.1 This, together with the hibernate-java8 module, enables use of high-resolution timestamps. In 5.2, the criteria support has been deprecated, and warnings are logged on each use of them. Migrating to a different mechanism is a large, separate task, so stay on 5.1 for now. --- .../HibernateFeedRepositoryTest.java | 42 ++----------------- hopper/pom.xml | 7 +++- pom.xml | 12 ++++-- 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java index 5cc02888..d5840df4 100644 --- a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java +++ b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java @@ -1,7 +1,5 @@ package org.atomhopper.hibernate; -import static org.mockito.Mockito.mock; - import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; @@ -20,8 +18,6 @@ import org.atomhopper.adapter.jpa.PersistedCategory; import org.atomhopper.adapter.jpa.PersistedEntry; import org.atomhopper.adapter.jpa.PersistedFeed; -import org.atomhopper.dbal.AtomDatabaseException; -import org.atomhopper.hibernate.actions.ComplexSessionAction; import org.atomhopper.hibernate.actions.SimpleSessionAction; import org.hibernate.Session; import org.hibernate.criterion.Restrictions; @@ -39,11 +35,8 @@ @RunWith(Enclosed.class) public class HibernateFeedRepositoryTest { - public static class WhenPerformingSimpleAction { - - HibernateFeedRepository feedRepository; + public static class WhenCreatingMisconfiguredFeedRepository{ Map parameters; - SimpleSessionAction simpleSessionAction; @Before public void setup() throws Exception { @@ -54,40 +47,11 @@ public void setup() throws Exception { parameters.put("hibernate.connection.username", "sa"); parameters.put("hibernate.connection.password", ""); parameters.put("hibernate.hbm2ddl.auto", "update"); - - feedRepository = new HibernateFeedRepository(parameters); - simpleSessionAction = mock(SimpleSessionAction.class); - } - - @Test(expected=AtomDatabaseException.class) - public void shouldThrowAtomDatabaseException() throws Exception { - feedRepository.performSimpleAction(simpleSessionAction); - } - } - - public static class WhenPerformingComplexAction { - - HibernateFeedRepository feedRepository; - Map parameters; - ComplexSessionAction complexSessionAction; - - @Before - public void setup() throws Exception { - parameters = new HashMap(); - parameters.put("hibernate.connection.driver_class", "org.h2.Driver"); - parameters.put("hibernate.dialect", "org.hibernate.dialect.H2Dialect"); - parameters.put("hibernate.connection.username", "sa"); - parameters.put("hibernate.connection.password", ""); - parameters.put("hibernate.hbm2ddl.auto", "update"); - - feedRepository = new HibernateFeedRepository(parameters); - complexSessionAction = mock(ComplexSessionAction.class); } - /*This should throw the error because */ - @Test(expected=AtomDatabaseException.class) + @Test(expected=UnsupportedOperationException.class) public void shouldThrowAtomDatabaseException() throws Exception { - feedRepository.performComplexAction(complexSessionAction); + new HibernateFeedRepository(parameters); } } diff --git a/hopper/pom.xml b/hopper/pom.xml index fb316027..c54a407c 100644 --- a/hopper/pom.xml +++ b/hopper/pom.xml @@ -30,7 +30,12 @@ hibernate-core
- + + org.hibernate + hibernate-java8 + + + org.javassist javassist diff --git a/pom.xml b/pom.xml index 21b8a936..7fd95517 100644 --- a/pom.xml +++ b/pom.xml @@ -94,16 +94,22 @@ org.hibernate hibernate-core - 4.1.3.Final + 5.1.1.Final org.hibernate hibernate-c3p0 - 4.1.3.Final + 5.1.1.Final - + + org.hibernate + hibernate-java8 + 5.1.1.Final + + + org.javassist javassist 3.16.1-GA From eac408dffcc3aa0dcb95d6c5779c35507726586c Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Wed, 17 Aug 2016 10:52:25 +0200 Subject: [PATCH 08/11] Use higher precision time stamps internally This reduces the probability of entries sharing the same timestamps. Shared timestamps cause issues with pagination. --- .../adapter/HibernateFeedPublisher.java | 8 +- .../adapter/HibernateFeedSource.java | 5 +- .../HibernateFeedRepositoryTest.java | 163 +++++++++++++++++- .../adapter/MigrationFeedPublisher.java | 3 +- .../adapter/jpa/PersistedEntry.java | 50 +++--- .../java/org/atomhopper/util/NanoClock.java | 49 ++++++ .../adapter/jpa/PersistedEntryTest.java | 8 +- .../atomhopper/FeedForwardBackwardTest.java | 143 +++++++++------ 8 files changed, 333 insertions(+), 96 deletions(-) create mode 100644 hopper/src/main/java/org/atomhopper/util/NanoClock.java diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java index a042cb47..d525eabd 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java @@ -91,8 +91,8 @@ public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { Date updated = abderaParsedEntry.getUpdated(); if (updated != null) { - persistedEntry.setDateLastUpdated(updated); - persistedEntry.setCreationDate(updated); + persistedEntry.setDateLastUpdated(updated.toInstant()); + persistedEntry.setCreationDate(updated.toInstant()); } } @@ -106,8 +106,8 @@ public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { persistedEntry.setFeed(feedRef); persistedEntry.setEntryBody(entryToString(abderaParsedEntry)); - abderaParsedEntry.setUpdated(persistedEntry.getDateLastUpdated()); - abderaParsedEntry.setPublished(persistedEntry.getCreationDate()); + abderaParsedEntry.setUpdated(Date.from(persistedEntry.getDateLastUpdated())); + abderaParsedEntry.setPublished(Date.from(persistedEntry.getCreationDate())); feedRepository.saveEntry(persistedEntry); diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedSource.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedSource.java index 44907201..21ad848c 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedSource.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedSource.java @@ -4,6 +4,7 @@ import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLEncoder; +import java.sql.Date; import java.util.List; import java.util.Map; import java.util.UUID; @@ -163,8 +164,8 @@ private Entry hydrateEntry(PersistedEntry persistedEntry, Abdera abderaReference if (hydratedEntryDocument != null) { entry = hydratedEntryDocument.getRoot(); - entry.setUpdated(persistedEntry.getDateLastUpdated()); - entry.setPublished(persistedEntry.getCreationDate()); + entry.setUpdated(Date.from(persistedEntry.getDateLastUpdated())); + entry.setPublished(Date.from(persistedEntry.getCreationDate())); } return entry; diff --git a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java index d5840df4..b338f2cc 100644 --- a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java +++ b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java @@ -1,8 +1,11 @@ package org.atomhopper.hibernate; +import static org.junit.Assert.assertFalse; + import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -18,9 +21,12 @@ import org.atomhopper.adapter.jpa.PersistedCategory; import org.atomhopper.adapter.jpa.PersistedEntry; import org.atomhopper.adapter.jpa.PersistedFeed; +import org.atomhopper.dbal.PageDirection; import org.atomhopper.hibernate.actions.SimpleSessionAction; +import org.atomhopper.hibernate.query.SimpleCategoryCriteriaGenerator; import org.hibernate.Session; import org.hibernate.criterion.Restrictions; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -63,7 +69,7 @@ public static void setup() throws Exception { Map parameters; parameters = new HashMap(); parameters.put("hibernate.connection.driver_class", "org.h2.Driver"); - parameters.put("hibernate.connection.url", "jdbc:h2:mem:WhenCreatingFeed"); + parameters.put("hibernate.connection.url", "jdbc:h2:mem:WhenCreatingEntry"); parameters.put("hibernate.dialect", "org.hibernate.dialect.H2Dialect"); parameters.put("hibernate.connection.username", "sa"); parameters.put("hibernate.connection.password", ""); @@ -122,6 +128,108 @@ public void perform(Session liveSession) { } }); } + + @Test + public void pagingForwardsShouldNotFindTheEntry() { + final PersistedEntry entry = feedRepository.getEntry("entryId", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", entry, PageDirection.FORWARD, new SimpleCategoryCriteriaGenerator(""), 1 + ); + Assert.assertTrue(page.isEmpty()); + } + + @Test + public void pagingBackwardsShouldFindTheEntry() { + final PersistedEntry entry = feedRepository.getEntry("entryId", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", entry, PageDirection.BACKWARD, new SimpleCategoryCriteriaGenerator(""), 1 + ); + Assert.assertEquals(1, page.size()); + Assert.assertEquals("entryId", page.get(0).getEntryId()); + } + } + + public static class WhenCreatingTwoEntries { + static HibernateFeedRepository feedRepository; + + @BeforeClass + public static void setup() throws Exception { + Map parameters; + parameters = new HashMap(); + parameters.put("hibernate.connection.driver_class", "org.h2.Driver"); + parameters.put("hibernate.connection.url", "jdbc:h2:mem:WhenCreatingTwoEntries"); + parameters.put("hibernate.dialect", "org.hibernate.dialect.H2Dialect"); + parameters.put("hibernate.connection.username", "sa"); + parameters.put("hibernate.connection.password", ""); + parameters.put("hibernate.hbm2ddl.auto", "update"); + + feedRepository = new HibernateFeedRepository(parameters); + + PersistedFeed feed = new PersistedFeed("feedName", "feedId"); + + PersistedEntry entry1 = new PersistedEntry("entry1"); + Instant earlier = Instant.now().minusSeconds(60); + entry1.setDateLastUpdated(earlier); + entry1.setFeed(feed); + + feedRepository.saveEntry(entry1); + + PersistedEntry entry2 = new PersistedEntry("entry2"); + Instant now = Instant.now(); + entry2.setDateLastUpdated(now); + entry2.setFeed(feed); + + feedRepository.saveEntry(entry2); + } + + @Test + public void entriesShouldBeSortedByRecency() throws Exception { + final List entries = feedRepository.getFeedHead( + "feedName", new SimpleCategoryCriteriaGenerator(""), 2 + ); + Assert.assertEquals(entries.get(0).getEntryId(), "entry2"); + Assert.assertEquals(entries.get(1).getEntryId(), "entry1"); + } + + @Test + public void pagingBackwardsFromLatestShouldFindBoth() { + final PersistedEntry latest = feedRepository.getEntry("entry2", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", latest, PageDirection.BACKWARD, new SimpleCategoryCriteriaGenerator(""), 2 + ); + Assert.assertEquals(2, page.size()); + Assert.assertEquals("entry2", page.get(0).getEntryId()); + Assert.assertEquals("entry1", page.get(1).getEntryId()); + } + + @Test + public void pagingForwardsFromLatestShouldNotFindAnything() { + final PersistedEntry latest = feedRepository.getEntry("entry2", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", latest, PageDirection.FORWARD, new SimpleCategoryCriteriaGenerator(""), 2 + ); + Assert.assertTrue(page.isEmpty()); + } + + @Test + public void pagingBackwardsFromOldestShouldOnlyFindOldest() { + final PersistedEntry oldest = feedRepository.getEntry("entry1", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", oldest, PageDirection.BACKWARD, new SimpleCategoryCriteriaGenerator(""), 2 + ); + Assert.assertEquals(1, page.size()); + Assert.assertEquals("entry1", page.get(0).getEntryId()); + } + + @Test + public void pagingForwardsFromOldestShouldFindLatest() { + final PersistedEntry oldest = feedRepository.getEntry("entry1", "feedName"); + final List page = feedRepository.getFeedPage( + "feedName", oldest, PageDirection.FORWARD, new SimpleCategoryCriteriaGenerator(""), 2 + ); + Assert.assertEquals(1, page.size()); + Assert.assertEquals("entry2", page.get(0).getEntryId()); + } } public static class WhenGettingCategories { @@ -321,5 +429,58 @@ public Integer run() { r1.get(3000); r2.get(3000); } + + @Test + public void entriesShouldGetDistinctIds() throws Exception { + + final PersistedEntry existing = new PersistedEntry("existing"); + final PersistedFeed feed = new PersistedFeed("feed1", "feed1"); + feed.setEntries(Collections.singleton(existing)); + existing.setFeed(feed); + feedRepository.saveEntry(existing); + + final Runner.Future r1 = runner1.run(new Runner.Operation() { + @Override + public PersistedEntry run() { + final PersistedEntry entry = new PersistedEntry("entry2a"); + feed.setEntries(Collections.singleton(entry)); + entry.setFeed(feed); + try { + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + + feedRepository.saveEntry(entry); + return entry; + } + }); + final Runner.Future r2 = runner2.run(new Runner.Operation() { + @Override + public PersistedEntry run() { + final PersistedEntry entry = new PersistedEntry("entry2b"); + feed.setEntries(Collections.singleton(entry)); + entry.setFeed(feed); + try { + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + + feedRepository.saveEntry(entry); + return entry; + } + }); + final PersistedEntry entry1 = r1.get(3000); + final PersistedEntry entry2 = r2.get(3000); + + assertFalse(entry1.getEntryId().equals(entry2.getEntryId())); + assertFalse(entry1.getCreationDate().isBefore(existing.getCreationDate())); + assertFalse(entry2.getCreationDate().isBefore(existing.getCreationDate())); + } } } diff --git a/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java b/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java index 43ccbd27..3f2ed66b 100644 --- a/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java +++ b/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java @@ -15,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; import java.util.Map; import java.util.UUID; @@ -69,7 +70,7 @@ public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { // If allowOverrideDate is false then set the DateLastUpdated // Also set the DateLastUpdated if allowOverrideDate is true, but no DateLastUpdated was sent in the entry if (!allowOverrideDate || postEntryRequest.getEntry().getUpdated() == null) { - postEntryRequest.getEntry().setUpdated(entry.getDateLastUpdated()); + postEntryRequest.getEntry().setUpdated(Date.from(entry.getDateLastUpdated())); } switch (writeTo) { diff --git a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java index 64efbe78..23bcf603 100644 --- a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java +++ b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java @@ -1,5 +1,12 @@ package org.atomhopper.adapter.jpa; +import java.io.Serializable; +import java.time.Clock; +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import javax.persistence.Basic; import javax.persistence.CascadeType; import javax.persistence.Column; @@ -12,15 +19,8 @@ import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; -import javax.persistence.Temporal; -import javax.persistence.TemporalType; -import java.io.Serializable; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.Set; -import java.util.TimeZone; + +import org.atomhopper.util.NanoClock; @Entity @Table(name = "Entries") @@ -49,22 +49,18 @@ public class PersistedEntry implements Serializable { @Basic(optional = false) @Column(name = "CreationDate") - @Temporal(TemporalType.TIMESTAMP) - private Date creationDate; + private Instant creationDate; @Basic(optional = false) @Column(name = "DateLastUpdated") - @Temporal(TemporalType.TIMESTAMP) - private Date dateLastUpdated; + private Instant dateLastUpdated; public PersistedEntry() { categories = Collections.EMPTY_SET; - - final Calendar localNow = Calendar.getInstance(TimeZone.getDefault()); - localNow.setTimeInMillis(System.currentTimeMillis()); - - creationDate = localNow.getTime(); - dateLastUpdated = localNow.getTime(); + Clock nanoClock = new NanoClock(); + Instant now = Instant.now(nanoClock); + creationDate = now; + dateLastUpdated = now; } public PersistedEntry(String entryId) { @@ -74,20 +70,20 @@ public PersistedEntry(String entryId) { this.entryId = entryId; } - public Date getCreationDate() { - return (Date) creationDate.clone(); + public Instant getCreationDate() { + return creationDate; } - public void setCreationDate(Date creationDate) { - this.creationDate = (Date) creationDate.clone(); + public void setCreationDate(Instant creationDate) { + this.creationDate = creationDate; } - public Date getDateLastUpdated() { - return (Date) dateLastUpdated.clone(); + public Instant getDateLastUpdated() { + return dateLastUpdated; } - public void setDateLastUpdated(Date dateLastUpdated) { - this.dateLastUpdated = (Date) dateLastUpdated.clone(); + public void setDateLastUpdated(Instant dateLastUpdated) { + this.dateLastUpdated = dateLastUpdated; } public Set getCategories() { diff --git a/hopper/src/main/java/org/atomhopper/util/NanoClock.java b/hopper/src/main/java/org/atomhopper/util/NanoClock.java new file mode 100644 index 00000000..ad43640f --- /dev/null +++ b/hopper/src/main/java/org/atomhopper/util/NanoClock.java @@ -0,0 +1,49 @@ +package org.atomhopper.util; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +public class NanoClock extends Clock +{ + private final Clock clock; + + private final long initialNanos; + + private final Instant initialInstant; + + public NanoClock() + { + this(Clock.systemUTC()); + } + + public NanoClock(final Clock clock) + { + this.clock = clock; + initialInstant = clock.instant(); + initialNanos = getSystemNanos(); + } + + @Override + public ZoneId getZone() + { + return clock.getZone(); + } + + @Override + public Instant instant() + { + return initialInstant.plusNanos(getSystemNanos() - initialNanos); + } + + @Override + public Clock withZone(final ZoneId zone) + { + return new NanoClock(clock.withZone(zone)); + } + + private long getSystemNanos() + { + return System.nanoTime(); + } +} \ No newline at end of file diff --git a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java index 83b1a955..fb44afbb 100644 --- a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java +++ b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java @@ -5,7 +5,7 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; -import java.util.Date; +import java.time.Instant; import java.util.HashSet; import java.util.Set; @@ -44,7 +44,7 @@ public void shouldCreatePersistedEntryWithEntryId() throws Exception { public static class WhenAccessingPersistedEntries { private PersistedEntry persistedEntry; - private Date date; + private Instant date; private Set persistedCategories; @Before @@ -57,7 +57,7 @@ public void shouldReturnCreationDate() throws Exception { assertNotNull("Getting creation date should not return null.", persistedEntry.getCreationDate()); date = persistedEntry.getCreationDate(); assertEquals("Getting creation date should return a Date object.", persistedEntry.getCreationDate(), date); - persistedEntry.setCreationDate(new Date()); + persistedEntry.setCreationDate(Instant.now()); assertNotSame("Setting the creation date should update the object.", persistedEntry.getCreationDate(), date); } @@ -66,7 +66,7 @@ public void shouldReturnDateLastUpdated() throws Exception { assertNotNull("Getting the date last updated should not return null.", persistedEntry.getDateLastUpdated()); date = persistedEntry.getDateLastUpdated(); assertEquals("Getting the date last updated should return a date object.", persistedEntry.getDateLastUpdated(), date); - persistedEntry.setDateLastUpdated(new Date()); + persistedEntry.setDateLastUpdated(Instant.now()); assertNotSame("Setting the date last updated should change last updated date.", persistedEntry.getDateLastUpdated(), date); } diff --git a/test-suite/src/test/java/org/atomhopper/FeedForwardBackwardTest.java b/test-suite/src/test/java/org/atomhopper/FeedForwardBackwardTest.java index 13847f20..1db96f8b 100644 --- a/test-suite/src/test/java/org/atomhopper/FeedForwardBackwardTest.java +++ b/test-suite/src/test/java/org/atomhopper/FeedForwardBackwardTest.java @@ -1,5 +1,16 @@ package org.atomhopper; +import static java.util.stream.Collectors.toList; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; + +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.abdera.Abdera; import org.apache.abdera.model.Document; @@ -16,102 +27,120 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -import static junit.framework.Assert.assertEquals; - - @RunWith(Enclosed.class) public class FeedForwardBackwardTest extends JettyIntegrationTestHarness { private static final HttpClient httpClient = new HttpClient(); private static final String urlAndPort = "http://localhost:" + getPort(); private static Abdera abdera = null; - + public static synchronized Abdera getInstance() { if (abdera == null) { abdera = new Abdera(); } return abdera; - } + } public static PostMethod newPostEntryMethod(String content) { final PostMethod post = new PostMethod(getURL()); post.addRequestHeader(new Header("content-type", "application/atom+xml")); - post.setRequestBody("Chad" + content + ""); + post.setRequestBody( + "Chad" + + content + ""); return post; } - + public static String getURL() { return urlAndPort + "/namespace3/feed3/"; } - + public static GetMethod getFeedMethod() { return new GetMethod(getURL()); - } - + } + public static String getFeedDirectionForwardMethod(String markerId) { return getURL() + "?marker=" + markerId + "&direction=forward&limit=10"; - } - + } + public static String getFeedDirectionBackwardMethod(String markerId) { return getURL() + "?marker=" + markerId + "&direction=backward&limit=10"; - } - + } + public static class WhenRequestingFeed { @Test public void shouldOrderCorrectlyForwardAndBackward() throws Exception { + Parser parser = getInstance().getParser(); + URL url = new URL(getURL()); + List idsOfPostedEntries = new ArrayList<>(); + final HttpMethod getFeedMethod = getFeedMethod(); - assertEquals("Hitting Atom Hopper with an empty datastore should return a 200", HttpStatus.SC_OK, httpClient.executeMethod(getFeedMethod)); + assertEquals("Hitting Atom Hopper with an empty datastore should return a 200", HttpStatus.SC_OK, + httpClient.executeMethod(getFeedMethod)); // Create 20 new entries - for(int i = 1; i < 21; i++) { + for (int i = 1; i < 21; i++) { final HttpMethod postMethod = newPostEntryMethod("" + Integer.toString(i) + ""); - assertEquals("Creating a new entry should return a 201", HttpStatus.SC_CREATED, httpClient.executeMethod(postMethod)); + assertEquals("Creating a new entry should return a 201", HttpStatus.SC_CREATED, + httpClient.executeMethod(postMethod)); + final Document response = parser.parse(postMethod.getResponseBodyAsStream(), url.toString()); + idsOfPostedEntries.add(response.getRoot().getId().toString()); } - + // namespace3/feed3 - assertEquals("Getting a feed should return a 200", HttpStatus.SC_OK, httpClient.executeMethod(getFeedMethod)); + assertEquals("Getting a feed should return a 200", HttpStatus.SC_OK, + httpClient.executeMethod(getFeedMethod)); // A bit verbose, but it checks the forward and backward direction of the feed - Parser parser = getInstance().getParser(); - URL url = new URL(getURL()); Document doc = parser.parse(url.openStream(), url.toString()); - Feed feed = doc.getRoot(); - List idList = new ArrayList(); - - // Get the IDs in their default order - for (Entry entry : feed.getEntries()) { - idList.add(entry.getId().toString()); + List entries = doc.getRoot().getEntries(); + List idList = entries.stream().map(e -> e.getId().toString()).collect(Collectors.toList()); + assertTrue("All posted entries should be found in feed page", idList.containsAll(idsOfPostedEntries)); + assertTrue("Feed page should only contain the posted entries", idsOfPostedEntries.containsAll(idList)); + //entries.forEach(e -> System.out.printf("%s %s\n", e.getId(), e.getPublished().toInstant())); + for (int i = 1; i < entries.size(); i++) { + Entry first = entries.get(i-1); + Entry second = entries.get(i); + assertFalse( + String.format( + "Entry %s created at %s should be listed before entry %s created at %s", + first, first.getPublished().toInstant(), second, second.getPublished().toInstant() + ), + first.getPublished().before(second.getPublished()) + ); + } + + if (idList.isEmpty()) { + fail(); } - - if(!(idList.isEmpty())) { - int idCount = 0; - - // Check the feed backward with the first id as the marker - URL urlBackward = new URL(getFeedDirectionBackwardMethod(idList.get(0))); - Document docBackward = parser.parse(urlBackward.openStream(), urlBackward.toString()); - Feed feedBackward = docBackward.getRoot(); - - for (Entry entry : feedBackward.getEntries()) { - assertEquals("The entries should be in backward order", entry.getId().toString(), idList.get(idCount)); - idCount++; - } - - // Check the feed forward with the last id as the marker - URL urlForward = new URL(getFeedDirectionForwardMethod(idList.get(idList.size() - 1))); - Document docForward = parser.parse(urlForward.openStream(), urlForward.toString()); - Feed feedForward = docForward.getRoot(); - // Adjust for the offset going forward - idCount = 9; - - for (Entry entry : feedForward.getEntries()) { - assertEquals("The entries should be in forward order", entry.getId().toString(), idList.get(idCount)); - idCount++; - } + + int idCount = 0; + final String marker = idList.get(0); + // Check the feed backward with the first id as the marker + URL urlBackward = new URL(getFeedDirectionBackwardMethod(marker)); + Document docBackward = parser.parse(urlBackward.openStream(), urlBackward.toString()); + Feed feedBackward = docBackward.getRoot(); + List backwardIds = feedBackward.getEntries().stream().map(e->e.getId().toString()).collect(toList()); + + assertTrue("The backwards feed page should contain the marker", backwardIds.contains(marker)); + assertEquals("The first entry should be the marker", marker, backwardIds.get(0)); + assertTrue("The entries on the backwards page should all be known", idList.containsAll(backwardIds)); + for (Entry entry : feedBackward.getEntries()) { + assertEquals("The entries should be in backward order", entry.getId().toString(), idList.get(idCount)); + idCount++; + } + + // Check the feed forward with the last id as the marker + URL urlForward = new URL(getFeedDirectionForwardMethod(idList.get(idList.size() - 1))); + Document docForward = parser.parse(urlForward.openStream(), urlForward.toString()); + Feed feedForward = docForward.getRoot(); + // Adjust for the offset going forward + idCount = 9; + + for (Entry entry : feedForward.getEntries()) { + assertEquals("The entries should be in forward order", entry.getId().toString(), idList.get(idCount)); + idCount++; } } - } + } } From 59d016c54922f92bb4c1e2a8a9bb0918116ccad3 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:52:48 +0200 Subject: [PATCH 09/11] hibernate: Use database timestamps for entries If application servers have clocks that are not in sync, then it is possible for timestamps to not be increasing. If there are two application servers with out-of-sync clocks, where the clock of server A is a bit ahead of the clock of server B, then it's possible that an entry written by A gets a higher timestamp than en entry written at a later point by server B. If a consumer read the entry written by server A, and then used it as a marker, then it would not receive the entry written by server B. This change makes all application servers ask the database server for the clock, similar to how it is done in the Postgres adapter. --- .../hibernate/HibernateFeedRepository.java | 17 +++++++++++++++++ .../adapter/HibernateFeedPublisher.java | 4 ++-- .../hibernate/HibernateFeedRepositoryTest.java | 2 ++ .../adapter/HibernateFeedSourceTest.java | 6 +++++- .../adapter/MigrationFeedPublisher.java | 9 ++++++--- .../atomhopper/adapter/jpa/PersistedEntry.java | 7 ------- .../adapter/jpa/PersistedEntryTest.java | 2 ++ 7 files changed, 34 insertions(+), 13 deletions(-) diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java index 0fd6af9b..9dc79996 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java @@ -1,5 +1,7 @@ package org.atomhopper.hibernate; +import java.sql.Timestamp; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -18,10 +20,13 @@ import org.atomhopper.hibernate.actions.SimpleSessionAction; import org.atomhopper.hibernate.query.CategoryCriteriaGenerator; import org.hibernate.Criteria; +import org.hibernate.LockMode; +import org.hibernate.Query; import org.hibernate.Session; import org.hibernate.Transaction; import org.hibernate.criterion.Order; import org.hibernate.criterion.Restrictions; +import org.hibernate.type.TimestampType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -256,6 +261,18 @@ public void perform(Session liveSession) { feed = entry.getFeed(); } + if (entry.getCreationDate() == null || entry.getDateLastUpdated() == null) { + final Instant now = ((Timestamp)liveSession.createQuery( + "select coalesce(max(current_timestamp()), current_timestamp()) from PersistedFeed" + ).uniqueResult()).toInstant(); + if (entry.getCreationDate() == null) { + entry.setCreationDate(now); + } + if (entry.getDateLastUpdated() == null) { + entry.setDateLastUpdated(now); + } + } + liveSession.saveOrUpdate(feed); liveSession.save(entry); } diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java index d525eabd..09c63266 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/adapter/HibernateFeedPublisher.java @@ -106,11 +106,11 @@ public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { persistedEntry.setFeed(feedRef); persistedEntry.setEntryBody(entryToString(abderaParsedEntry)); + feedRepository.saveEntry(persistedEntry); + abderaParsedEntry.setUpdated(Date.from(persistedEntry.getDateLastUpdated())); abderaParsedEntry.setPublished(Date.from(persistedEntry.getCreationDate())); - feedRepository.saveEntry(persistedEntry); - incrementCounterForFeed(postEntryRequest.getFeedName()); return ResponseBuilder.created(abderaParsedEntry); diff --git a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java index b338f2cc..6679e7c5 100644 --- a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java +++ b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/HibernateFeedRepositoryTest.java @@ -170,6 +170,7 @@ public static void setup() throws Exception { PersistedEntry entry1 = new PersistedEntry("entry1"); Instant earlier = Instant.now().minusSeconds(60); entry1.setDateLastUpdated(earlier); + entry1.setCreationDate(earlier); entry1.setFeed(feed); feedRepository.saveEntry(entry1); @@ -177,6 +178,7 @@ public static void setup() throws Exception { PersistedEntry entry2 = new PersistedEntry("entry2"); Instant now = Instant.now(); entry2.setDateLastUpdated(now); + entry2.setCreationDate(now); entry2.setFeed(feed); feedRepository.saveEntry(entry2); diff --git a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/adapter/HibernateFeedSourceTest.java b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/adapter/HibernateFeedSourceTest.java index 246414be..89a5a3a7 100644 --- a/adapters/hibernate/src/test/java/org/atomhopper/hibernate/adapter/HibernateFeedSourceTest.java +++ b/adapters/hibernate/src/test/java/org/atomhopper/hibernate/adapter/HibernateFeedSourceTest.java @@ -1,6 +1,7 @@ package org.atomhopper.hibernate.adapter; import java.net.URL; +import java.time.Instant; import java.util.*; import static junit.framework.Assert.assertEquals; @@ -72,6 +73,9 @@ public void setUp() throws Exception { persistedEntry.setFeed(persistedFeed); persistedEntry.setEntryId(ID); persistedEntry.setEntryBody(ENTRY_BODY); + Instant now = Instant.now(); + persistedEntry.setCreationDate(now); + persistedEntry.setDateLastUpdated(now); } @Test(expected = UnsupportedOperationException.class) @@ -170,4 +174,4 @@ public void shouldGetCurrentLinkFromArchiveFeedAndArchiveNode() throws Exception assertTrue("'' node should exist", found ); } } -} \ No newline at end of file +} diff --git a/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java b/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java index 3f2ed66b..71eb8bb9 100644 --- a/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java +++ b/adapters/migration/src/main/java/org/atomhopper/migration/adapter/MigrationFeedPublisher.java @@ -15,8 +15,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Calendar; import java.util.Date; import java.util.Map; +import java.util.TimeZone; import java.util.UUID; public class MigrationFeedPublisher implements FeedPublisher { @@ -59,8 +61,6 @@ public void setAllowOverrideDate(boolean allowOverrideDate) { @Override public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { - PersistedEntry entry = new PersistedEntry(); - // If allowOverrideId is false then set the Id // Also set the id if allowOverrideId is true, but no Id was sent in the entry if (!allowOverrideId || postEntryRequest.getEntry().getId() == null || StringUtils.isBlank(postEntryRequest.getEntry().getId().toString().trim())) { @@ -70,7 +70,10 @@ public AdapterResponse postEntry(PostEntryRequest postEntryRequest) { // If allowOverrideDate is false then set the DateLastUpdated // Also set the DateLastUpdated if allowOverrideDate is true, but no DateLastUpdated was sent in the entry if (!allowOverrideDate || postEntryRequest.getEntry().getUpdated() == null) { - postEntryRequest.getEntry().setUpdated(Date.from(entry.getDateLastUpdated())); + final Calendar localNow = Calendar.getInstance(TimeZone.getDefault()); + localNow.setTimeInMillis(System.currentTimeMillis()); + + postEntryRequest.getEntry().setUpdated(localNow.getTime()); } switch (writeTo) { diff --git a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java index 23bcf603..f0152494 100644 --- a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java +++ b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java @@ -1,7 +1,6 @@ package org.atomhopper.adapter.jpa; import java.io.Serializable; -import java.time.Clock; import java.time.Instant; import java.util.Collections; import java.util.HashSet; @@ -20,8 +19,6 @@ import javax.persistence.ManyToOne; import javax.persistence.Table; -import org.atomhopper.util.NanoClock; - @Entity @Table(name = "Entries") public class PersistedEntry implements Serializable { @@ -57,10 +54,6 @@ public class PersistedEntry implements Serializable { public PersistedEntry() { categories = Collections.EMPTY_SET; - Clock nanoClock = new NanoClock(); - Instant now = Instant.now(nanoClock); - creationDate = now; - dateLastUpdated = now; } public PersistedEntry(String entryId) { diff --git a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java index fb44afbb..cfe8a713 100644 --- a/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java +++ b/hopper/src/test/java/org/atomhopper/adapter/jpa/PersistedEntryTest.java @@ -50,6 +50,8 @@ public static class WhenAccessingPersistedEntries { @Before public void setUp() throws Exception { persistedEntry = new PersistedEntry(); + persistedEntry.setCreationDate(Instant.now()); + persistedEntry.setDateLastUpdated(Instant.now()); } @Test From 3091020c7c95519f35d7e5f49af8d68cd3f5007b Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:40:46 +0200 Subject: [PATCH 10/11] hibernate: Lock the feed when adding entries Otherwise, there's a race condition where newer entries can get older timestamps: if server A starts a transaction, gets a timestamp, but then gets blocked, and server B then starts a transaction, gets a higher timstamp, and commits, then when server A resumes, its entry will be written at a later time then the entry from server B, but with a lower timestamp. A consumer that read the entry from server B and used it as a marker would then not get the entry from server A. Locking the feed before getting the timestamp and holding it until the commit solves this issue, at the cost of serializing all writes to a feed. --- .../java/org/atomhopper/hibernate/HibernateFeedRepository.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java index 9dc79996..c1535e6c 100644 --- a/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java +++ b/adapters/hibernate/src/main/java/org/atomhopper/hibernate/HibernateFeedRepository.java @@ -259,6 +259,8 @@ public void perform(Session liveSession) { if (feed == null) { feed = entry.getFeed(); + } else { + liveSession.lock(feed, LockMode.PESSIMISTIC_WRITE); } if (entry.getCreationDate() == null || entry.getDateLastUpdated() == null) { From 77393d07e4fb9513655f888b3dbb09a78be71642 Mon Sep 17 00:00:00 2001 From: Magnus Reftel Date: Fri, 9 Sep 2016 14:43:19 +0200 Subject: [PATCH 11/11] hibernate: Mark timesetamps unique Pagination doesn't work well when a marker has the same timestamp as other entries. If one producer writes an entry, a consumer reads it, and then another producer writes a new entry, and this gets the same timestamp as the first entry, then it's not clear whether the consumer should get this second entry or not when using the first entry as marker. If entries with the same timestamp as the marker should be included, then consumers may get entries multiple times if they share timestamp with the marker. If entries with the same timestamp as the marker should not be included, then consumers may miss entries. Using secondary sorting on ID also doesn't work, since it then is possible for a new entry that happen to get a lower-sorting ID to be considered older than an existing entry with a higher-sorting ID. This commit prevents these error conditions by forbidding sharing of timestamps between entries in the same feed. That way, a producer that happens to get the same timestamp as an earlier entry will have to retry, giving it a new timestamp. This ensures that consumers get each entry exactly once. Due to the high resolution time stamps and the lock on the feed, collisions should in practice never happen. --- .../java/org/atomhopper/adapter/jpa/PersistedEntry.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java index f0152494..5e59ad62 100644 --- a/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java +++ b/hopper/src/main/java/org/atomhopper/adapter/jpa/PersistedEntry.java @@ -18,9 +18,15 @@ import javax.persistence.ManyToMany; import javax.persistence.ManyToOne; import javax.persistence.Table; +import javax.persistence.UniqueConstraint; @Entity -@Table(name = "Entries") +@Table( + name = "Entries", uniqueConstraints = { + @UniqueConstraint(columnNames= {"Feed", "CreationDate"}), + @UniqueConstraint(columnNames= {"Feed", "DateLastUpdated"}) + } +) public class PersistedEntry implements Serializable { @Id