Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ivan/state-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ichub committed Dec 19, 2023
2 parents c0a2128 + 54ec729 commit 0679204
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
useUserForcedToLogout
} from "../../src/appHooks";
import { isDefaultSubscription } from "../../src/defaultSubscriptions";
import { saveSubscriptions } from "../../src/localstorage";
import {
clearAllPendingRequests,
pendingAddSubscriptionRequestKey,
Expand Down Expand Up @@ -89,6 +90,7 @@ export function AddSubscriptionScreen() {
setFetchedProviderName(response.providerName);
if (!subs.getProvider(response.providerUrl)) {
subs.addProvider(response.providerUrl, response.providerName);
saveSubscriptions(subs);
}
})
.catch((e) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import urljoin from "url-join";
import { validate } from "uuid";
import { appConfig } from "../../../src/appConfig";
import { useDispatch, useSubscriptions } from "../../../src/appHooks";
import { saveSubscriptions } from "../../../src/localstorage";

export const DEFAULT_FROG_SUBSCRIPTION_PROVIDER_URL = `${appConfig.frogCryptoServer}/frogcrypto/feeds`;

Expand All @@ -30,6 +31,7 @@ export function useInitializeFrogSubscriptions(): (
DEFAULT_FROG_SUBSCRIPTION_PROVIDER_URL,
FrogCryptoFolderName
);
saveSubscriptions(subs); // If the subscription was new, make sure it's saved.

function parseAndAddFeed(feed: Feed, deeplink: boolean): boolean {
// skip any feeds that are already subscribed to.
Expand Down
3 changes: 0 additions & 3 deletions apps/passport-client/pages/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import {
saveCheckedInOfflineTickets,
saveIdentity,
saveOfflineTickets,
saveSubscriptions,
saveUsingLaserScanner
} from "../src/localstorage";
import { registerServiceWorker } from "../src/registerServiceWorker";
Expand Down Expand Up @@ -411,8 +410,6 @@ async function loadInitialState(): Promise<AppState> {
const checkedInOfflineDevconnectTickets =
loadCheckedInOfflineDevconnectTickets();

subscriptions.updatedEmitter.listen(() => saveSubscriptions(subscriptions));

let modal = { modalType: "none" } as AppState["modal"];

if (
Expand Down
37 changes: 25 additions & 12 deletions apps/passport-client/src/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -841,22 +841,30 @@ async function doSync(
serverStorageRevision: upRes.value.revision,
serverStorageHash: upRes.value.storageHash
};
} else if (upRes.error.name === "Conflict") {
// Conflicts are resolved at download time, so ensure another download.
return {
completedFirstSync: true,
extraDownloadRequested: true
};
} else {
const res: Partial<AppState> = {
completedFirstSync: true
};
// Upload failed. Update AppState if necessary, but not unnecessarily.
// AppState updates will trigger another upload attempt.
const needExtraDownload = upRes.error.name === "Conflict";
if (
state.completedFirstSync &&
(!needExtraDownload || state.extraDownloadRequested)
) {
return undefined;
}

const updates: Partial<AppState> = {};
if (!state.completedFirstSync) {
// We completed a first attempt at sync, even if it failed.
updates.completedFirstSync = true;
}
if (needExtraDownload && !state.extraDownloadRequested) {
updates.extraDownloadRequested = true;
}
if (upRes.error.name === "ValidationError") {
res.userInvalid = true;
updates.userInvalid = true;
}

return res;
return updates;
}
}

Expand Down Expand Up @@ -928,12 +936,17 @@ async function addSubscription(
if (!state.subscriptions.getProvider(providerUrl)) {
state.subscriptions.addProvider(providerUrl, providerName);
}
await state.subscriptions.subscribe(providerUrl, feed, true);
const sub = await state.subscriptions.subscribe(providerUrl, feed, true);
await saveSubscriptions(state.subscriptions);
update({
subscriptions: state.subscriptions,
loadedIssuedPCDs: false
});
dispatch(
{ type: "sync-subscription", subscriptionId: sub.id },
state,
update
);
}

async function removeSubscription(
Expand Down
171 changes: 146 additions & 25 deletions apps/passport-client/src/useSyncE2EEStorage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
serializeStorage
} from "@pcd/passport-interface";
import { PCDCollection } from "@pcd/pcd-collection";
import { PCD } from "@pcd/pcd-types";
import { Identity } from "@semaphore-protocol/identity";
import stringify from "fast-json-stable-stringify";
import { useCallback, useContext, useEffect } from "react";
Expand All @@ -32,14 +33,6 @@ import { getPackages } from "./pcdPackages";
import { useOnStateChange } from "./subscribe";
import { validateAndLogRunningAppState } from "./validateState";

// Temporary feature flag to allow the sync-merge code to be on the main branch
// before it's fully complete. When this is set to false, the behavior should
// remain as it was before sync-merge was implemented at all. Uploads will
// always overwrite existing contents, and downloads will always take new
// contents without merging.
// TODO(artwyman): Remove this when #1342 is complete.
const ENABLE_SYNC_MERGE = false;

export type UpdateBlobKeyStorageInfo = {
revision: string;
storageHash: string;
Expand Down Expand Up @@ -80,7 +73,7 @@ export async function updateBlobKeyForEncryptedStorage(
newUser.uuid,
newSalt,
encryptedStorage,
ENABLE_SYNC_MERGE ? knownServerStorageRevision : undefined
knownServerStorageRevision
);
if (changeResult.success) {
console.log(
Expand Down Expand Up @@ -188,7 +181,7 @@ export async function uploadSerializedStorage(
appConfig.zupassServer,
blobKey,
encryptedStorage,
ENABLE_SYNC_MERGE ? knownRevision : undefined
knownRevision
);

if (uploadResult.success) {
Expand Down Expand Up @@ -223,22 +216,153 @@ export type MergeStorageResult = APIResult<MergeableFields, NamedAPIError>;
* Merge the contents of local and remote states, both of which have potential
* changes from a common base state.
*
* TODO(artwyman): Describe merge algorithm.
* PCDs and subscriptions are each merged independently, using the same basic
* algorithm. If there are no differences (identical hash) then this merge
* always returns the "remote" fields, making it equivalent to a simple
* download-and-replace.
*
* The merge performed here is limited by the lack of historical revisions, or
* other information which could clarify the user's intent. E.g. we can't tell
* a remote "add" from a local "remove", and in case of replacement we don't
* know which version is newer. Without that, we can't know which version of
* data is "better". Instead we're opinionated on these tradeoffs:
* 1) Keeping data is always better than losing data. If we can't tell if an
* object was added or removed, assume it was added. This means a conflict
* may cause a removed object to reappear, but shouldn't cause a new object
* to be lost. This preference is chosen to avoid losing user data, but may
* not always be ideal, e.g. in the case of subscriptions granting
* permissions the user intended to revoke.
* 2) By default, prefer data downloaded from the server. This is mostly
* arbitrary, but given that we upload more aggressively than we download
* it approximates a "first to reach the server wins" policy. This means if
* an object is modified (with the same ID) and involved in a conflict, one
* of the two copies will be kept, determined by the timing of uploads.
*
* As a side-effect of the implementation of #2, this merge algorithm always
* replaces the PCDCollection and FeedsSubscriptionManager with new objects,
* discarding non-serialized state (like listeners on emitters, and
* subscription errors).
*
* This function always uploads a report to the server that a merge occurred,
* with some stats which we hope will be helpful to detect future problems
* and tune better merge algorithms.
*
* @param localFields the local PCDs and subscriptions currently in use. These
* are unmodified by the current merge algorithm.
* @param remoteFields the new PCDs and subscriptions downloaded from the
* server. These are modified and returned by the current merge algorithm.
* @param self a user object used to populate log messages.
* @returns the resulting PCDs and subscriptions to be used going forward.
* In the current merge algorithm, these are always modified versions of the
* `remoteFields`.
*
*/
export async function mergeStorage(
_localFields: MergeableFields,
localFields: MergeableFields,
remoteFields: MergeableFields,
self: User
): Promise<MergeStorageResult> {
console.error(
"[SYNC] sync conflict needs merge! Keeping only remote state."
);
// TODO(artwyman): Refactor this out to implement and test real merge.
requestLogToServer(appConfig.zupassServer, "sync-merge", {
user: self.uuid
// TODO(artwyman): more details for tracking.
// Merge PCDs and Subscriptions independently, and gather a unified set of
// stats to include in a report to the server.
// TODO(#1372): Detect and report on cases where objects differ with the
// same ID.
let identical = true;
let anyPCDDiffs = false;
let anySubDiffs = false;

// PCD merge: Based on PCDCollection.merge, with predicate providing filtering
// by ID, and updating stats based on how many PCDs were added.
const pcdMergeStats = {
localOnly: 0,
remoteOnly: remoteFields.pcds.size() - localFields.pcds.size(),
both: localFields.pcds.size(),
final: remoteFields.pcds.size()
};
if (
(await localFields.pcds.getHash()) != (await remoteFields.pcds.getHash())
) {
identical = false;
const pcdMergePredicate = (pcd: PCD, remotePCDs: PCDCollection) => {
if (remotePCDs.hasPCDWithId(pcd.id)) {
return false;
} else {
pcdMergeStats.localOnly++;
pcdMergeStats.remoteOnly++;
pcdMergeStats.both--;
return true;
}
};
// TODO(#1373): Attempt to preserve order while merging?
remoteFields.pcds.merge(localFields.pcds, {
shouldInclude: pcdMergePredicate
});
pcdMergeStats.final = remoteFields.pcds.size();
anyPCDDiffs = pcdMergeStats.localOnly > 0 || pcdMergeStats.remoteOnly > 0;

if (anyPCDDiffs) {
console.log("[SYNC] merged PCDS:", pcdMergeStats);
} else {
console.log(
"[SYNC] PCD merge made no changes to downloaded set: IDs are identical"
);
}
} else {
console.log("[SYNC] no merge of PCDs: hashes are identical");
}

// Subscription merge: Based on FeedSubscriptionManager.merge, with stats
// calculated based on the returned counts.
const localCount = localFields.subscriptions.getActiveSubscriptions().length;
const remoteCount =
remoteFields.subscriptions.getActiveSubscriptions().length;
const subMergeStats = {
localOnly: 0,
remoteOnly: remoteCount - localCount,
both: localCount,
final: remoteCount
};
if (
(await localFields.subscriptions.getHash()) !=
(await remoteFields.subscriptions.getHash())
) {
identical = false;
const subMergeResults = remoteFields.subscriptions.merge(
localFields.subscriptions
);
subMergeStats.localOnly += subMergeResults.newSubscriptions;
subMergeStats.remoteOnly += subMergeResults.newSubscriptions;
subMergeStats.both -= subMergeResults.newSubscriptions;
subMergeStats.final =
remoteFields.subscriptions.getActiveSubscriptions().length;
anySubDiffs = subMergeStats.localOnly > 0 || subMergeStats.remoteOnly > 0;

if (anySubDiffs) {
console.log("[SYNC] merged subscriptions:", subMergeStats);
} else {
console.log(
"[SYNC] subscription merge made no changes to downloaded set: IDs are identical"
);
}
} else {
console.log("[SYNC] no merge of subscriptions: hashes are identical");
}

// Report stats to the server for analysis.
await requestLogToServer(appConfig.zupassServer, "sync-merge", {
user: self.uuid,
identical: identical,
changedPcds: anyPCDDiffs,
changedSubscriptions: anySubDiffs,
pcdMergeStats: pcdMergeStats,
subscriptionMergeStats: subMergeStats
});
return { value: remoteFields, success: true };
return {
value: {
pcds: remoteFields.pcds,
subscriptions: remoteFields.subscriptions
},
success: true
};
}

export type SyncStorageResult = APIResult<
Expand Down Expand Up @@ -303,17 +427,14 @@ export async function downloadAndMergeStorage(
// Check if local app state has changes since the last server revision, in
// which case a merge is necessary. Otherwise we keep the downloaded state.
let [newPCDs, newSubscriptions] = [dlPCDs, dlSubscriptions];
if (
ENABLE_SYNC_MERGE &&
knownServerRevision !== undefined &&
knownServerHash !== undefined
) {
if (knownServerRevision !== undefined && knownServerHash !== undefined) {
const appStorage = await serializeStorage(
appSelf,
appPCDs,
appSubscriptions
);
if (appStorage.storageHash !== knownServerHash) {
console.warn("[SYNC] revision conflict on download needs merge!");
const mergeResult = await mergeStorage(
{ pcds: appPCDs, subscriptions: appSubscriptions },
{
Expand Down
6 changes: 3 additions & 3 deletions apps/passport-server/src/routing/routes/e2eeRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export function initE2EERoutes(
* storage, storing the new encrypted storage, and updating the user's
* salt so they can re-derive their key.
*/
app.post("/sync/v2/changeBlobKey", async (req: Request, res: Response) => {
app.post("/sync/v3/changeBlobKey", async (req: Request, res: Response) => {
const request = req.body as ChangeBlobKeyRequest;
await e2eeService.handleChangeBlobKey(request, res);
});
Expand All @@ -35,7 +35,7 @@ export function initE2EERoutes(
*
* @todo - restrict the calling of this api somehow? at least a rate limit.
*/
app.get("/sync/v2/load/", async (req: Request, res: Response) => {
app.get("/sync/v3/load/", async (req: Request, res: Response) => {
await e2eeService.handleLoad(
checkQueryParam(req, "blobKey"),
checkOptionalQueryParam(req, "knownRevision"),
Expand All @@ -55,7 +55,7 @@ export function initE2EERoutes(
* @todo - restrict + rate limit this?
* @todo - size limits?
*/
app.post("/sync/v2/save", async (req: Request, res: Response) => {
app.post("/sync/v3/save", async (req: Request, res: Response) => {
const request = req.body as UploadEncryptedStorageRequest;
await e2eeService.handleSave(request, res);
});
Expand Down
Loading

0 comments on commit 0679204

Please sign in to comment.