Skip to content

Commit

Permalink
chore: kickoff release
Browse files Browse the repository at this point in the history
  • Loading branch information
lawmicha authored Aug 28, 2024
2 parents 79d062d + cb80b91 commit 806a75c
Show file tree
Hide file tree
Showing 36 changed files with 780 additions and 186 deletions.
3 changes: 2 additions & 1 deletion Amplify/Core/Configuration/AmplifyOutputsData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ public struct AmplifyOutputsData: Codable {
public struct AmplifyOutputs {

/// A closure that resolves the `AmplifyOutputsData` configuration
let resolveConfiguration: () throws -> AmplifyOutputsData
@_spi(InternalAmplifyConfiguration)
public let resolveConfiguration: () throws -> AmplifyOutputsData

/// Resolves configuration with `amplify_outputs.json` in the main bundle.
public static let amplifyOutputs: AmplifyOutputs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
Task { [weak self] in
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.bindCancellableToConnection(try await self.startSubscription(id))
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}

return filterAppSyncSubscriptionEvent(with: id)
Expand Down Expand Up @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
.store(in: &cancellables)
self.storeInCancellables(cancellable)
}

private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task {
Task { [weak self] in
do {
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
Expand Down Expand Up @@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
subject.filter {
switch $0 {
case .success(let response): return response.id == id || response.type == .connectionError
case .failure(let error): return true
case .failure: return true
}
}
.map { result -> AppSyncSubscriptionEvent? in
Expand Down Expand Up @@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}

}

// MARK: - On WebSocket Events
Expand All @@ -366,8 +368,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}

case let .disconnected(closeCode, reason): //
Expand Down Expand Up @@ -425,24 +430,37 @@ extension AppSyncRealTimeClient {
}
}

private func monitorHeartBeats(_ connectionAck: JSONValue?) {
func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
heartBeats.eraseToAnyPublisher()
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
.store(in: &cancellablesBindToConnection)
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}

extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}

private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}

extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase {
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)

}

func testReconnect_whenHeartBeatSignalIsNotReceived() async throws {
var cancellables = Set<AnyCancellable>()
let timeout = 1.0
let mockWebSocketClient = MockWebSocketClient()
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
let appSyncClient = AppSyncRealTimeClient(
endpoint: URL(string: "https://example.com")!,
requestInterceptor: mockAppSyncRequestInterceptor,
webSocketClient: mockWebSocketClient
)

// start monitoring
await appSyncClient.monitorHeartBeats(.object([
"connectionTimeoutMs": 100
]))

let reconnect = expectation(description: "webSocket triggers event to connection")
await mockWebSocketClient.actionSubject.sink { action in
switch action {
case .connect:
reconnect.fulfill()
default: break
}
}.store(in: &cancellables)
await fulfillment(of: [reconnect], timeout: 2)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation
import Amplify // Amplify.Auth
import AWSPluginsCore // AuthAWSCredentialsProvider
import AWSClientRuntime // AWSClientRuntime.CredentialsProviding
import ClientRuntime // SdkHttpRequestBuilder
import AwsCommonRuntimeKit // CommonRuntimeKit.initialize()

extension AWSCognitoAuthPlugin {


/// Creates a AWS IAM SigV4 signer capable of signing AWS AppSync requests.
///
/// **Note**. Although this method is static, **Amplify.Auth** is required to be configured with **AWSCognitoAuthPlugin** as
/// it depends on the credentials provider from Cognito through `Amplify.Auth.fetchAuthSession()`. The static type allows
/// developers to simplify their callsite without having to access the method on the plugin instance.
///
/// - Parameter region: The region of the AWS AppSync API
/// - Returns: A closure that takes in a requestand returns a signed request.
public static func createAppSyncSigner(region: String) -> ((URLRequest) async throws -> URLRequest) {
return { request in
try await signAppSyncRequest(request,
region: region)
}
}

static func signAppSyncRequest(_ urlRequest: URLRequest,
region: Swift.String,
signingName: Swift.String = "appsync",
date: ClientRuntime.Date = Date()) async throws -> URLRequest {
CommonRuntimeKit.initialize()

// Convert URLRequest to SDK's HTTPRequest
guard let requestBuilder = try createAppSyncSdkHttpRequestBuilder(
urlRequest: urlRequest) else {
return urlRequest
}

// Retrieve the credentials from credentials provider
let credentials: AWSClientRuntime.AWSCredentials
let authSession = try await Amplify.Auth.fetchAuthSession()
if let awsCredentialsProvider = authSession as? AuthAWSCredentialsProvider {
let awsCredentials = try awsCredentialsProvider.getAWSCredentials().get()
credentials = awsCredentials.toAWSSDKCredentials()
} else {
let error = AuthError.unknown("Auth session does not include AWS credentials information")
throw error
}

// Prepare signing
let flags = SigningFlags(useDoubleURIEncode: true,
shouldNormalizeURIPath: true,
omitSessionToken: false)
let signedBodyHeader: AWSSignedBodyHeader = .none
let signedBodyValue: AWSSignedBodyValue = .empty
let signingConfig = AWSSigningConfig(credentials: credentials,
signedBodyHeader: signedBodyHeader,
signedBodyValue: signedBodyValue,
flags: flags,
date: date,
service: signingName,
region: region,
signatureType: .requestHeaders,
signingAlgorithm: .sigv4)

// Sign request
guard let httpRequest = await AWSSigV4Signer.sigV4SignedRequest(
requestBuilder: requestBuilder,

signingConfig: signingConfig
) else {
return urlRequest
}

// Update original request with new headers
return setHeaders(from: httpRequest, to: urlRequest)
}

static func setHeaders(from sdkRequest: SdkHttpRequest, to urlRequest: URLRequest) -> URLRequest {
var urlRequest = urlRequest
for header in sdkRequest.headers.headers {
urlRequest.setValue(header.value.joined(separator: ","), forHTTPHeaderField: header.name)
}
return urlRequest
}

static func createAppSyncSdkHttpRequestBuilder(urlRequest: URLRequest) throws -> SdkHttpRequestBuilder? {

guard let url = urlRequest.url,
let host = url.host else {
return nil
}

var headers = urlRequest.allHTTPHeaderFields ?? [:]
headers.updateValue(host, forKey: "host")

let httpMethod = (urlRequest.httpMethod?.uppercased())
.flatMap(HttpMethodType.init(rawValue:)) ?? .get

let queryItems = URLComponents(url: url, resolvingAgainstBaseURL: false)?.queryItems?
.map { ClientRuntime.SDKURLQueryItem(name: $0.name, value: $0.value)} ?? []

let requestBuilder = SdkHttpRequestBuilder()
.withHost(host)
.withPath(url.path)
.withQueryItems(queryItems)
.withMethod(httpMethod)
.withPort(443)
.withProtocol(.https)
.withHeaders(.init(headers))
.withBody(.data(urlRequest.httpBody))

return requestBuilder
}
}

extension AWSPluginsCore.AWSCredentials {

func toAWSSDKCredentials() -> AWSClientRuntime.AWSCredentials {
if let tempCredentials = self as? AWSTemporaryCredentials {
return AWSClientRuntime.AWSCredentials(
accessKey: tempCredentials.accessKeyId,
secret: tempCredentials.secretAccessKey,
expirationTimeout: tempCredentials.expiration,
sessionToken: tempCredentials.sessionToken)
} else {
return AWSClientRuntime.AWSCredentials(
accessKey: accessKeyId,
secret: secretAccessKey,
expirationTimeout: Date())
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ struct AWSCognitoAuthCredentialStore {
newIdentityConfigData != nil &&
oldIdentityPoolConfiguration == newIdentityConfigData
{

// retrieve data from the old namespace and save with the new namespace
if let oldCognitoCredentialsData = try? keychain._getData(oldNameSpace) {
try? keychain._set(oldCognitoCredentialsData, key: newNameSpace)
}
} else if oldAuthConfigData != currentAuthConfig {
} else if oldAuthConfigData != currentAuthConfig &&
oldNameSpace != newNameSpace {
// Clear the old credentials
try? keychain._remove(oldNameSpace)
}
Expand Down
Loading

0 comments on commit 806a75c

Please sign in to comment.