Skip to content

Commit

Permalink
Merge pull request #70 from kdnilsen/fast-and-furious
Browse files Browse the repository at this point in the history
Fast and furious
  • Loading branch information
kdnilsen authored Aug 2, 2022
2 parents 94f0cf4 + 1875ac2 commit 969c776
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 67 deletions.
27 changes: 27 additions & 0 deletions Extremem/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ java -jar src/main/java/extremem.jar \
-dCustomerPeriod=12s -dCustomerThinkTime=8s -dSimulationDuration=20m
```

### *-dFastAndFurious=false*

In the default Extremem configuration, the shared Customers and Products in-memory databases are each protected by a global
synchronization lock which allows multiple readers and a single writer. Multiple customers can read from these databases
concurrently. Each time a server thread replaces customers or products, a write-lock is required, causing all customer threads
to wait until the server thread has finished its changes to the database. With the high transaction rates required to represent
allocations in excess of 2 GB per second, significant synchronization contention has been observed. This flag changes the
synchronization protocol. The FastAndFurious mode of operation replaces the global multiple-reader-single-writer lock with
a larger number of smaller-context locks. Locks that protect much smaller scopes are held for much shorter
time frames, improving parallel access to shared data structures.
The large majority of these smaller-context locks should normally be uncontended
because the contexts are so small that collisions by multiple threads on the same small contexts is normally
rare. This mode of operation is identified as ``furious'' because it allows false positives and false
negatives. During the process of replacing products, the indexes might report a match to a product that no longer exists.
Likewise, the indexes may not recognize a match for a product that has been newly added but is not yet indexed. This mode
of operation properly uses synchronization to assure coherency of data structures. The default value of the FastAndFurious flag
is false, preserving compatibility with the original Extremem mode of operation. While configuring FastAndFurious=true allows
Extremem to simulate higher allocation rates with less interference from synchronization contention, disabling FastAndFurious
may reveal different weaknesses in particular GC approaches. In particular, interference from synchronization causes allocations
to be more bursty. While a single server thread locks indexes in order to replace products or customers, multiple customer
threads that would normally be allocating are idle, waiting for the server thread to releases its exclusive lock. When the server
thread releases its lock, these customer threads resume execution and allocate at rates much higher than normal because they
have fallen behind their intended execution schedule. This causes a burst of allocation, making it difficult for the GC
scheduling heuristic to predict when the allocation pool will become depleted. If the heuristic is late to trigger the start
of GC, it is likely that the allocation pool will become exhausted before the GC replenishes it, resulting in a degenerated
stop-the-world GC pause.

## Interpreting Results

The report displays response times for each of the various distinct operations that are performed by the Extremem workload. The average response times give an approximation of overall performance. A lower average response time corresponds to improved throughput.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Configuration {

static final boolean DefaultReportIndividualThreads = false;
static final boolean DefaultReportCSV = false;
static final boolean DefaultFastAndFurious = false;

static final int DefaultDictionarySize = 25000;
static final String DefaultDictionaryFile = "/usr/share/dict/words";
Expand Down Expand Up @@ -94,6 +95,7 @@ class Configuration {
private int CustomerThreads;
private int ServerThreads;

private boolean FastAndFurious;
private boolean ReportIndividualThreads;
private boolean ReportCSV;

Expand Down Expand Up @@ -197,6 +199,8 @@ void initialize(ExtrememThread t) {
ProductReviewLength = DefaultProductReviewLength;
RandomSeed = DefaultRandomSeed;

FastAndFurious = DefaultFastAndFurious;

SimulationDuration = new RelativeTime(t, DefaultDurationMinutes * 60, 0);
SimulationDuration.changeLifeSpan(t, LifeSpan.NearlyForever);

Expand Down Expand Up @@ -252,6 +256,7 @@ void initialize(ExtrememThread t) {
}

private static String[] boolean_patterns = {
"FastAndFurious",
"ReportCSV",
"ReportIndividualThreads",
};
Expand Down Expand Up @@ -352,11 +357,16 @@ else if (booleanString.equals("true"))

switch (index) {
case 0:
if (keyword.equals("FastAndFurious")) {
FastAndFurious = b;
break;
}
case 1:
if (keyword.equals("ReportCSV")) {
ReportCSV = b;
break;
}
case 1:
case 2:
if (keyword.equals("ReportIndividualThreads")) {
ReportIndividualThreads = b;
break;
Expand Down Expand Up @@ -724,6 +734,10 @@ String DictionaryFile() {
return DictionaryFile();
}

boolean FastAndFurious() {
return FastAndFurious;
}

int MaxArrayLength() {
return MaxArrayLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
class Customer extends ExtrememObject {
private static final int InitialSaveForLaterQueueSize = 8;

private boolean deceased;

private final String name;
private final long id;

Expand All @@ -21,6 +23,7 @@ class Customer extends ExtrememObject {
super(t, ls);
final Polarity Grow = Polarity.Expand;
final MemoryLog log = t.memoryLog();
this.deceased = false;
this.name = name;
this.id = uniq_id;
sflq = new BrowsingHistory [InitialSaveForLaterQueueSize];
Expand All @@ -45,6 +48,12 @@ long id () {
return id;
}

synchronized boolean isDeceased() {
return deceased;
}

// Note that size may change asynchronously when new items are added
// to the browsing history by other threads.
synchronized int browsingHistorySize() {
return sflq.length;
}
Expand All @@ -54,39 +63,51 @@ synchronized void transactSale(ExtrememThread t, AbsoluteTime release_time,
purchase_hash += sale.hash(release_time);
}

// Add s4l to this Customer's BrowsingHistoryQueue. Return the
// number of new slots added to this Customer's
// BrowsingHistoryQueue. Typical return value is zero.
// s4l was instantiated by the CustomerThread, has been associated with the BrowsingHistoryQueue of that CustomerThread,
// and has been enqueued on that BrowsingHistoryQueue. Here, we make note that the BrowsingHistory object corresponds
// to this Customer so that we can eliminate it if this Customer is deprecated.
//
// If there was a race whereby a server thread replaces this customer at "the same time" that a customer thread determines
// to save products for later on this same customer, it may be that this object is deceased upon invocation of this service.
// If this object is already deceased, its previously existing save-for-later array has already been expunged from the
// associated BrowsingHistoryQueue and that array will not be processed again. So we have to expunge the entry here.
//
// Return the number of new slots added to this Customer's BrowsingHistory array. Typical return value is zero.
synchronized int addSaveForLater(ExtrememThread t, BrowsingHistory s4l) {
int bqes = 0; // browsing queue expansion slots
if (csfl >= sflq.length) { // double the size of existing queue
final Polarity Grow = Polarity.Expand;
int old_size = sflq.length;
final MemoryLog log = t.memoryLog();
final MemoryLog garbage = t.garbageLog();

log.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayObject, Grow, 1);
log.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayReference, Grow, old_size);

bqes = old_size;
BrowsingHistory[] new_queue = new BrowsingHistory [2 * old_size];
for (int i = 0; i < csfl; i++) {
new_queue [i] = sflq [fsfl++];
if (fsfl == old_size)
fsfl = 0;
if (deceased) {
s4l.queue().dequeue(s4l);
s4l.garbageFootprint(t);
} else {
if (csfl >= sflq.length) { // double the size of existing queue
final Polarity Grow = Polarity.Expand;
int old_size = sflq.length;
final MemoryLog log = t.memoryLog();
final MemoryLog garbage = t.garbageLog();

log.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayObject, Grow, 1);
log.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayReference, Grow, old_size);

bqes = old_size;
BrowsingHistory[] expanded_array = new BrowsingHistory [2 * old_size];
for (int i = 0; i < csfl; i++) {
expanded_array [i] = sflq [fsfl++];
if (fsfl == old_size)
fsfl = 0;
}
sflq = expanded_array; // old array becomes garbage

garbage.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayObject, Grow, 1);
garbage.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayReference, Grow, old_size);
fsfl = 0;
}
sflq = new_queue; // old queue becomes garbage

garbage.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayObject, Grow, 1);
garbage.accumulate(LifeSpan.NearlyForever,
MemoryFlavor.ArrayReference, Grow, old_size);
fsfl = 0;
int queue_length = sflq.length;
sflq [(fsfl + csfl++) % queue_length] = s4l;
}
int queue_length = sflq.length;
sflq [(fsfl + csfl++) % queue_length] = s4l;
return bqes;
}

Expand Down Expand Up @@ -163,6 +184,7 @@ synchronized void retireOneSaveForLater (BrowsingHistory h) {
// instance's save-for-later queue.
synchronized int prepareForDemise(ExtrememThread t) {
int index = fsfl;
this.deceased = true;
for (int i = 0; i < csfl; i++) {
BrowsingHistory h = sflq[index];
h.queue().dequeue(h);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ public void runExtreme() {
c, p, expiration,
browsing_queue);
expiration.garbageFootprint(this);
all_customers.addSaveForLater(c, this, h);
browsing_queue.enqueue(h);
all_customers.addSaveForLater(c, this, h);
// Garbage collection of the h BrowsingHistory object is
// the "responsibility" of the ServerThread. After the
// ServerThread expires h because its expiration time has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,19 @@ private String randomDistinctName (ExtrememThread t) {
}

Customer selectRandomCustomer(ExtrememThread t) {
RandomSelector rs = new RandomSelector(t, LifeSpan.Ephemeral, this);
cc.actAsReader(rs);
Customer result = rs.one;
rs.garbageFootprint(t);
Customer result;
if (config.FastAndFurious()) {
int index = t.randomUnsignedInt() % config.NumCustomers();
synchronized (customer_names) {
String name = customer_names.get(index);
result = customer_map.get(name);
}
} else {
RandomSelector rs = new RandomSelector(t, LifeSpan.Ephemeral, this);
cc.actAsReader(rs);
result = rs.one;
rs.garbageFootprint(t);
}
return result;
}

Expand All @@ -131,9 +140,51 @@ Customer controlledSelectRandomCustomer(ExtrememThread t) {
}

void replaceRandomCustomer(ExtrememThread t) {
RandomReplacer rr = new RandomReplacer(t, LifeSpan.Ephemeral, this);
cc.actAsWriter(rr);
rr.garbageFootprint(t);
if (config.FastAndFurious()) {
String new_customer_name = randomDistinctName(t);
long new_customer_no;
Customer obsolete_customer;
synchronized (this) {
new_customer_no = next_customer_no++;
}
Customer new_customer = new Customer(t, LifeSpan.NearlyForever, new_customer_name, new_customer_no);
int replacement_index = t.randomUnsignedInt() % config.NumCustomers();
synchronized (customer_names) {
String replacement_name = customer_names.get(replacement_index);
customer_names.set(replacement_index, new_customer_name);

synchronized(customer_map) {
obsolete_customer = customer_map.remove(replacement_name);
customer_map.put(new_customer_name, new_customer);
}
}
// Do memory accounting outside synchronized block
MemoryLog log = t.memoryLog();
int new_customer_length = new_customer_name.length();
Util.convertEphemeralString(t, this.intendedLifeSpan(), new_customer_length);
adjust_cncl(new_customer_length);

// Give the decommissioned customer opportunity to unhook saved-for-later products.
adjust_cbhs(-obsolete_customer.prepareForDemise(t));
String obsolete_customer_name = obsolete_customer.name();
int obsolete_len = obsolete_customer_name.length();
Util.abandonNonEphemeralString(t, this.intendedLifeSpan(), obsolete_len);
adjust_cncl(-obsolete_len);
// The obsolete_customer is not garbage collected until it is no longer referenced from any pending sales
// transactions. We'll account for its garbage here rather than adding logic to reclaim Customer memory immediately
// following particular sales transactions.
obsolete_customer.garbageFootprint(t);

// Abandon the memory for the obsolete HashEntry
Util.abandonHashEntry(t, this.intendedLifeSpan());

adjust_cbhs(new_customer.browsingHistorySize());
Util.addHashEntry(t, this.intendedLifeSpan());
} else {
RandomReplacer rr = new RandomReplacer(t, LifeSpan.Ephemeral, this);
cc.actAsWriter(rr);
rr.garbageFootprint(t);
}
}

// Returns previous value.
Expand Down Expand Up @@ -230,8 +281,12 @@ void tallyMemory(MemoryLog log, LifeSpan ls, Polarity p) {
}

void report(ExtrememThread t) {
Report.output("Customers concurrency report:");
cc.report(t, config.ReportCSV());
if (config.FastAndFurious()) {
Report.output("No Customers concurrency report since configuration is FastAndFurious");
} else {
Report.output("Customers concurrency report:");
cc.report(t, config.ReportCSV());
}
}

/*
Expand Down
Loading

0 comments on commit 969c776

Please sign in to comment.