diff --git a/flow-client/package-lock.json b/flow-client/package-lock.json index 21d139256e5..2cb80e75721 100644 --- a/flow-client/package-lock.json +++ b/flow-client/package-lock.json @@ -1529,6 +1529,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-3.10.1.tgz", "integrity": "sha512-Ug1RcWcrJP02hmtaXVS3axPPTTPnZjupqhgj+NnZ6BCkwSImWk/283347+x9wN+lqOdK9Eo3vsyiyDHgsmiEJw==", "dev": true, + "peer": true, "dependencies": { "@types/eslint-visitor-keys": "^1.0.0", "@typescript-eslint/experimental-utils": "3.10.1", @@ -2027,6 +2028,7 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", "integrity": "sha512-nQyp0o1/mNdbTO1PO6kHkwSrmgZ0MT/jCCpNiwbUjGoRN4dlBhqJtoQuCnEOKzgTVwg0ZWiCoQy6SxMebQVh8A==", "dev": true, + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3126,7 +3128,8 @@ "version": "0.0.1342118", "resolved": "https://registry.npmjs.org/devtools-protocol/-/devtools-protocol-0.0.1342118.tgz", "integrity": "sha512-75fMas7PkYNDTmDyb6PRJCH7ILmHLp+BhrZGeMsa4bCh40DTxgCz2NRy5UDzII4C5KuD0oBMZ9vXKhEl6UD/3w==", - "dev": true + "dev": true, + "peer": true }, "node_modules/diff": { "version": "5.2.0", @@ -3469,6 +3472,7 @@ "resolved": "https://registry.npmjs.org/eslint/-/eslint-7.32.0.tgz", "integrity": "sha512-VHZ8gX+EDfz+97jGcgyGCyRia/dPOd6Xh9yPv8Bl1+SoaIwD+a/vlrOmGRUyOYu7MwUhc7CxqeaDZU13S4+EpA==", "dev": true, + "peer": true, "dependencies": { "@babel/code-frame": "7.12.11", "@eslint/eslintrc": "^0.4.3", @@ -3642,6 +3646,7 @@ "integrity": "sha512-ixmkI62Rbc2/w8Vfxyh1jQRTdRTF52VxwRVHl/ykPAmqG+Nb7/kNn+byLP0LxPgI7zWA16Jt82SybJInmMia3A==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@rtsao/scc": "^1.1.0", "array-includes": "^3.1.8", @@ -5488,6 +5493,7 @@ "resolved": "https://registry.npmjs.org/lit/-/lit-3.2.1.tgz", "integrity": "sha512-1BBa1E/z0O9ye5fZprPtdqnc0BFzxIxTTOO/tQFmyC/hj1O3jL4TfmLBw0WEwjAokdLwpclkvGgDJwTIh0/22w==", "license": "BSD-3-Clause", + "peer": true, "dependencies": { "@lit/reactive-element": "^2.0.4", "lit-element": "^4.1.0", @@ -6252,6 +6258,7 @@ "integrity": "sha512-tdN8qQGvNjw4CHbY+XXk0JgCXn9QiF21a55rBe5LJAU+kDyC4WQn4+awm2Xfk2lQMk5fKup9XgzTZtGkjBdP9Q==", "dev": true, "license": "MIT", + "peer": true, "bin": { "prettier": "bin-prettier.js" }, @@ -6647,6 +6654,7 @@ "resolved": "https://registry.npmjs.org/rollup/-/rollup-4.22.4.tgz", "integrity": "sha512-vD8HJ5raRcWOyymsR6Z3o6+RzfEPCnVLMFJ6vRslO1jt4LO6dUo5Qnpg7y4RkZFM2DMe3WUirkI5c16onjrc6A==", "dev": true, + "peer": true, "dependencies": { "@types/estree": "1.0.5" }, @@ -7537,6 +7545,7 @@ "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -8944,6 +8953,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-3.10.1.tgz", "integrity": "sha512-Ug1RcWcrJP02hmtaXVS3axPPTTPnZjupqhgj+NnZ6BCkwSImWk/283347+x9wN+lqOdK9Eo3vsyiyDHgsmiEJw==", "dev": true, + "peer": true, "requires": { "@types/eslint-visitor-keys": "^1.0.0", "@typescript-eslint/experimental-utils": "3.10.1", @@ -9313,7 +9323,8 @@ "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", "integrity": "sha512-nQyp0o1/mNdbTO1PO6kHkwSrmgZ0MT/jCCpNiwbUjGoRN4dlBhqJtoQuCnEOKzgTVwg0ZWiCoQy6SxMebQVh8A==", - "dev": true + "dev": true, + "peer": true }, "acorn-jsx": { "version": "5.3.2", @@ -10072,7 +10083,8 @@ "version": "0.0.1342118", "resolved": "https://registry.npmjs.org/devtools-protocol/-/devtools-protocol-0.0.1342118.tgz", "integrity": "sha512-75fMas7PkYNDTmDyb6PRJCH7ILmHLp+BhrZGeMsa4bCh40DTxgCz2NRy5UDzII4C5KuD0oBMZ9vXKhEl6UD/3w==", - "dev": true + "dev": true, + "peer": true }, "diff": { "version": "5.2.0", @@ -10337,6 +10349,7 @@ "resolved": "https://registry.npmjs.org/eslint/-/eslint-7.32.0.tgz", "integrity": "sha512-VHZ8gX+EDfz+97jGcgyGCyRia/dPOd6Xh9yPv8Bl1+SoaIwD+a/vlrOmGRUyOYu7MwUhc7CxqeaDZU13S4+EpA==", "dev": true, + "peer": true, "requires": { "@babel/code-frame": "7.12.11", "@eslint/eslintrc": "^0.4.3", @@ -10503,6 +10516,7 @@ "resolved": "https://registry.npmjs.org/eslint-plugin-import/-/eslint-plugin-import-2.31.0.tgz", "integrity": "sha512-ixmkI62Rbc2/w8Vfxyh1jQRTdRTF52VxwRVHl/ykPAmqG+Nb7/kNn+byLP0LxPgI7zWA16Jt82SybJInmMia3A==", "dev": true, + "peer": true, "requires": { "@rtsao/scc": "^1.1.0", "array-includes": "^3.1.8", @@ -11780,6 +11794,7 @@ "version": "3.2.1", "resolved": "https://registry.npmjs.org/lit/-/lit-3.2.1.tgz", "integrity": "sha512-1BBa1E/z0O9ye5fZprPtdqnc0BFzxIxTTOO/tQFmyC/hj1O3jL4TfmLBw0WEwjAokdLwpclkvGgDJwTIh0/22w==", + "peer": true, "requires": { "@lit/reactive-element": "^2.0.4", "lit-element": "^4.1.0", @@ -12341,7 +12356,8 @@ "version": "2.8.8", "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.8.8.tgz", "integrity": "sha512-tdN8qQGvNjw4CHbY+XXk0JgCXn9QiF21a55rBe5LJAU+kDyC4WQn4+awm2Xfk2lQMk5fKup9XgzTZtGkjBdP9Q==", - "dev": true + "dev": true, + "peer": true }, "prettier-linter-helpers": { "version": "1.0.0", @@ -12610,6 +12626,7 @@ "resolved": "https://registry.npmjs.org/rollup/-/rollup-4.22.4.tgz", "integrity": "sha512-vD8HJ5raRcWOyymsR6Z3o6+RzfEPCnVLMFJ6vRslO1jt4LO6dUo5Qnpg7y4RkZFM2DMe3WUirkI5c16onjrc6A==", "dev": true, + "peer": true, "requires": { "@rollup/rollup-android-arm-eabi": "4.22.4", "@rollup/rollup-android-arm64": "4.22.4", @@ -13252,7 +13269,8 @@ "version": "5.7.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.7.3.tgz", "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", - "dev": true + "dev": true, + "peer": true }, "typical": { "version": "4.0.0", diff --git a/flow-client/src/main/frontend/Flow.ts b/flow-client/src/main/frontend/Flow.ts index ca81ad8cf1a..139554e5b3a 100644 --- a/flow-client/src/main/frontend/Flow.ts +++ b/flow-client/src/main/frontend/Flow.ts @@ -9,6 +9,228 @@ export interface FlowConfig { imports?: () => Promise; } +// ============================================================================ +// SSE Push - Inlined to avoid module resolution issues in non-optimized builds +// ============================================================================ + +interface SseAtmosphereResponse { + request: SseAtmosphereRequest; + transport: string; + state: string; + status: number; + responseBody: string; + headers: (name: string) => string | null; + closedByClientTimeout: boolean; + error?: string; +} + +interface SseAtmosphereRequest { + url: string; + maxReconnectOnClose?: number; + reconnectInterval?: number; + headers?: Record string)>; + onOpen?: (response: SseAtmosphereResponse) => void; + onMessage?: (response: SseAtmosphereResponse) => void; + onError?: (response: SseAtmosphereResponse) => void; + onClose?: (response: SseAtmosphereResponse) => void; + onReconnect?: (response: SseAtmosphereResponse, request: { timeout: number; attempts: number }) => void; + onTransportFailure?: (errorMsg: string, response: SseAtmosphereResponse) => void; +} + +class SseConnection { + private options: SseAtmosphereRequest; + private eventSource: EventSource | null = null; + private state: 'closed' | 'connecting' | 'connected' = 'closed'; + private reconnectAttempts = 0; + private maxReconnectAttempts: number; + private reconnectInterval: number; + private reconnectTimeoutId: ReturnType | null = null; + private url: string; + + constructor(options: SseAtmosphereRequest) { + this.options = options; + this.maxReconnectAttempts = options.maxReconnectOnClose ?? 25; + this.reconnectInterval = options.reconnectInterval ?? 5000; + this.url = this.buildSseUrl(options.url); + this.connect(); + } + + private buildSseUrl(baseUrl: string): string { + let url = baseUrl; + if (url.includes('v-r=push')) { + url = url.replace('v-r=push', 'v-r=sse'); + } else if (url.includes('?')) { + url += '&v-r=sse'; + } else { + url += '?v-r=sse'; + } + return url; + } + + private connect(): void { + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = null; + } + try { + this.state = 'connecting'; + this.eventSource = new EventSource(this.url, { withCredentials: true }); + + this.eventSource.onopen = () => { + this.state = 'connected'; + this.reconnectAttempts = 0; + const response = this.createResponse('open'); + response.transport = 'sse'; + response.state = 'opening'; + this.options.onOpen?.(response); + }; + + this.eventSource.onmessage = (event) => { + this.handleMessage(event.data); + }; + + this.eventSource.addEventListener('uidl', (event) => { + this.handleMessage((event as MessageEvent).data); + }); + + this.eventSource.addEventListener('sessionExpired', () => { + const response = this.createResponse('error'); + response.status = 401; + response.error = 'Session expired'; + this.options.onError?.(response); + this.close(); + }); + + this.eventSource.onerror = () => { + if (this.state === 'closed') return; + + // Don't rely on browser's auto-reconnection - it's inconsistent across + // browsers (Safari, Chrome, Firefox behave differently). Take full + // control of reconnection by closing the EventSource and managing + // reconnection ourselves. + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = null; + } + this.state = 'closed'; + this.scheduleReconnect(); + }; + } catch (e) { + const response = this.createResponse('error'); + response.error = e instanceof Error ? e.message : String(e); + this.options.onTransportFailure?.(response.error, response); + } + } + + private scheduleReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + // Max attempts exceeded - report error + const response = this.createResponse('error'); + response.error = 'Max reconnection attempts exceeded'; + this.options.onError?.(response); + this.options.onClose?.(response); + return; + } + + this.reconnectAttempts += 1; + const response = this.createResponse('reconnect'); + + // Notify about reconnection attempt (this integrates with Vaadin's connection state) + this.options.onReconnect?.(response, { + timeout: this.reconnectInterval, + attempts: this.reconnectAttempts + }); + + // Schedule actual reconnection - use shorter interval for first few attempts + const interval = this.reconnectAttempts <= 3 ? 1000 : this.reconnectInterval; + this.reconnectTimeoutId = setTimeout(() => { + this.reconnectTimeoutId = null; + if (this.state !== 'closed') return; // Already reconnected or closed by user + this.connect(); + }, interval); + } + + private handleMessage(data: string): void { + const response = this.createResponse('message'); + response.responseBody = data; + response.state = 'messageReceived'; + response.status = 200; + this.options.onMessage?.(response); + } + + private createResponse(type: string): SseAtmosphereResponse { + return { + request: this.options, + transport: 'sse', + state: type, + status: 200, + responseBody: '', + headers: () => null, + closedByClientTimeout: false + }; + } + + push(message: string): void { + const xhr = new XMLHttpRequest(); + let { url } = this.options; + if (url.includes('v-r=sse')) { + url = url.replace('v-r=sse', 'v-r=uidl'); + } else if (url.includes('v-r=push')) { + url = url.replace('v-r=push', 'v-r=uidl'); + } + xhr.open('POST', url, true); + xhr.setRequestHeader('Content-Type', 'application/json; charset=UTF-8'); + if (this.options.headers) { + for (const header of Object.keys(this.options.headers)) { + let value = this.options.headers[header]; + if (typeof value === 'function') { + value = value(); + } + xhr.setRequestHeader(header, value); + } + } + xhr.onreadystatechange = () => { + if (xhr.readyState === 4 && (xhr.status < 200 || xhr.status >= 300)) { + const response = this.createResponse('error'); + response.status = xhr.status; + this.options.onError?.(response); + } + }; + xhr.send(message); + } + + close(): void { + this.state = 'closed'; + if (this.reconnectTimeoutId) { + clearTimeout(this.reconnectTimeoutId); + this.reconnectTimeoutId = null; + } + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = null; + } + const response = this.createResponse('closed'); + response.state = 'closed'; + this.options.onClose?.(response); + } +} + +function initSsePush(): void { + const $wnd = window as any; + $wnd.vaadinPush = $wnd.vaadinPush || {}; + $wnd.vaadinPush.atmosphere = { + version: 'sse-1.0', + subscribe(options: SseAtmosphereRequest) { + return new SseConnection(options); + }, + unsubscribeUrl(_url: string): void { + // SSE connections are closed via the connection object + } + }; +} + +// ============================================================================ + class FlowUiInitializationError extends Error {} interface AppConfig { @@ -20,6 +242,7 @@ interface AppConfig { interface AppInitResponse { appConfig: AppConfig; pushScript?: string; + useSsePush?: boolean; } interface Router { @@ -311,9 +534,12 @@ export class Flow { // Initialize server side UI this.response = await this.flowInitUi(); - const { pushScript, appConfig } = this.response; + const { pushScript, useSsePush, appConfig } = this.response; - if (typeof pushScript === 'string') { + // Initialize push: use embedded SSE push or load external Atmosphere script + if (useSsePush) { + initSsePush(); + } else if (typeof pushScript === 'string') { await this.loadScript(pushScript); } const { appId } = appConfig; diff --git a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java index 0ac2d07024c..2020cf7959d 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java @@ -790,10 +790,22 @@ public void onError(ResourceLoadEvent event) { private String getVersionedPushJs() { String pushJs; - if (registry.getApplicationConfiguration().isProductionMode()) { - pushJs = ApplicationConstants.VAADIN_PUSH_JS; + boolean isProductionMode = registry.getApplicationConfiguration() + .isProductionMode(); + + // Check if SSE transport is configured + String configuredTransport = getPushConfiguration().getParameters() + .get(TRANSPORT_KEY); + boolean useSse = "sse".equals(configuredTransport); + + if (useSse) { + // Use SSE push script (no Atmosphere dependency) + pushJs = isProductionMode ? ApplicationConstants.VAADIN_SSE_PUSH_JS + : ApplicationConstants.VAADIN_SSE_PUSH_DEBUG_JS; } else { - pushJs = ApplicationConstants.VAADIN_PUSH_DEBUG_JS; + // Use Atmosphere-based push script + pushJs = isProductionMode ? ApplicationConstants.VAADIN_PUSH_JS + : ApplicationConstants.VAADIN_PUSH_DEBUG_JS; } return pushJs; } diff --git a/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java b/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java index 27199b023bd..d94216aa50d 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/DefaultConnectionStateHandler.java @@ -502,7 +502,12 @@ public void pushReconnectPending(PushConnection pushConnection) { debug("pushReconnectPending(" + pushConnection.getTransportType() + ")"); Console.debug("Reopening push connection"); - if (pushConnection.isBidirectional()) { + // SSE reliably notifies when connection is re-established (via onopen), + // so it should be treated like bidirectional connections for this + // purpose + boolean providesReconnectionFeedback = pushConnection.isBidirectional() + || "sse".equals(pushConnection.getTransportType()); + if (providesReconnectionFeedback) { // Lost connection for a connection which will tell us when the // connection is available again handleRecoverableError(Type.PUSH, null); diff --git a/flow-server/src/main/java/com/vaadin/flow/component/PushConfiguration.java b/flow-server/src/main/java/com/vaadin/flow/component/PushConfiguration.java index a5739390c57..0cfddd9f609 100644 --- a/flow-server/src/main/java/com/vaadin/flow/component/PushConfiguration.java +++ b/flow-server/src/main/java/com/vaadin/flow/component/PushConfiguration.java @@ -24,6 +24,7 @@ import com.vaadin.flow.server.communication.AtmospherePushConnection; import com.vaadin.flow.server.communication.PushConnection; import com.vaadin.flow.server.communication.PushConnectionFactory; +import com.vaadin.flow.server.communication.SsePushConnection; import com.vaadin.flow.shared.communication.PushMode; import com.vaadin.flow.shared.ui.Transport; @@ -223,7 +224,10 @@ public void setPushMode(PushMode pushMode) { session.checkHasLock(); - if (pushMode.isEnabled() + Transport transport = getTransport(); + + // SSE transport doesn't require Atmosphere, so skip the check for SSE + if (pushMode.isEnabled() && transport != Transport.SSE && !session.getService().ensurePushAvailable()) { throw new IllegalStateException( "Push is not available. See previous log messages for more information."); @@ -236,8 +240,16 @@ public void setPushMode(PushMode pushMode) { if (!oldMode.isEnabled() && pushMode.isEnabled()) { // The push connection is initially in a disconnected state; // the client will establish the connection - ui.getInternals() - .setPushConnection(pushConnectionFactory.apply(ui)); + PushConnection connection; + if (transport == Transport.SSE) { + // Use SSE push connection (no Atmosphere dependency) + connection = new SsePushConnection(ui); + } else { + // Use the configured factory (default: + // AtmospherePushConnection) + connection = pushConnectionFactory.apply(ui); + } + ui.getInternals().setPushConnection(connection); } // Nothing to do here if disabling push; // the client will close the connection diff --git a/flow-server/src/main/java/com/vaadin/flow/server/AppShellRegistry.java b/flow-server/src/main/java/com/vaadin/flow/server/AppShellRegistry.java index b2532b27c1f..79e567669b4 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/AppShellRegistry.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/AppShellRegistry.java @@ -331,8 +331,11 @@ public void modifyPushConfiguration(PushConfiguration pushConfiguration) { Push.class.getSimpleName())); } else if (!pushAnnotations.isEmpty()) { Push push = pushAnnotations.get(0); - pushConfiguration.setPushMode(push.value()); + // Set transport BEFORE push mode, because setPushMode creates the + // connection based on the configured transport (e.g., SSE needs + // SsePushConnection instead of AtmospherePushConnection) pushConfiguration.setTransport(push.transport()); + pushConfiguration.setPushMode(push.value()); } } diff --git a/flow-server/src/main/java/com/vaadin/flow/server/BootstrapHandler.java b/flow-server/src/main/java/com/vaadin/flow/server/BootstrapHandler.java index edf4bea6c30..6f3af9883ba 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/BootstrapHandler.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/BootstrapHandler.java @@ -88,6 +88,7 @@ import com.vaadin.flow.shared.communication.PushMode; import com.vaadin.flow.shared.ui.Dependency; import com.vaadin.flow.shared.ui.LoadMode; +import com.vaadin.flow.shared.ui.Transport; import static com.vaadin.flow.server.Constants.VAADIN_MAPPING; import static com.vaadin.flow.server.frontend.FrontendUtils.EXPORT_CHUNK; @@ -324,7 +325,10 @@ public PushMode getPushMode() { .getPushMode(); } - if (pushMode.isEnabled() + // SSE transport doesn't require Atmosphere + Transport transport = getUI().getPushConfiguration() + .getTransport(); + if (pushMode.isEnabled() && transport != Transport.SSE && !getService().ensurePushAvailable()) { /* * Fall back if not supported (ensurePushAvailable will log @@ -1348,11 +1352,14 @@ protected BootstrapContext createAndInitUI(Class uiClass, PushMode pushMode = push.map(Push::value) .orElseGet(deploymentConfiguration::getPushMode); setupPushConnectionFactory(pushConfiguration, context); - pushConfiguration.setPushMode(pushMode); pushConfiguration.setPushServletMapping( BootstrapHandlerHelper.determinePushServletMapping(session)); + // Set transport BEFORE push mode, because setPushMode creates the + // connection based on the configured transport (e.g., SSE needs + // SsePushConnection instead of AtmospherePushConnection) push.map(Push::transport).ifPresent(pushConfiguration::setTransport); + pushConfiguration.setPushMode(pushMode); // Set thread local here so it is available in init UI.setCurrent(ui); @@ -1512,12 +1519,23 @@ protected static String getPushScript(BootstrapContext context) { // Parameter appended to JS to bypass caches after version upgrade. String versionQueryParam = "?v=" + Version.getBuildHash(); + boolean isProductionMode = request.getService() + .getDeploymentConfiguration().isProductionMode(); + + // Check if SSE transport is configured + Transport transport = context.getUI().getPushConfiguration() + .getTransport(); + boolean useSse = transport == Transport.SSE; + String pushJs; - if (request.getService().getDeploymentConfiguration() - .isProductionMode()) { - pushJs = ApplicationConstants.VAADIN_PUSH_JS; + if (useSse) { + // Use SSE push script (no Atmosphere dependency) + pushJs = isProductionMode ? ApplicationConstants.VAADIN_SSE_PUSH_JS + : ApplicationConstants.VAADIN_SSE_PUSH_DEBUG_JS; } else { - pushJs = ApplicationConstants.VAADIN_PUSH_DEBUG_JS; + // Use Atmosphere-based push script + pushJs = isProductionMode ? ApplicationConstants.VAADIN_PUSH_JS + : ApplicationConstants.VAADIN_PUSH_DEBUG_JS; } // Use direct path - the already points to the servlet root, diff --git a/flow-server/src/main/java/com/vaadin/flow/server/HandlerHelper.java b/flow-server/src/main/java/com/vaadin/flow/server/HandlerHelper.java index 16d1a4df9c5..3c16ed6ad59 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/HandlerHelper.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/HandlerHelper.java @@ -120,6 +120,11 @@ public enum RequestType { */ PUSH(ApplicationConstants.REQUEST_TYPE_PUSH), + /** + * SSE push requests. + */ + SSE(ApplicationConstants.REQUEST_TYPE_SSE), + /** * Page showing that the browser is unsupported. */ diff --git a/flow-server/src/main/java/com/vaadin/flow/server/VaadinServletService.java b/flow-server/src/main/java/com/vaadin/flow/server/VaadinServletService.java index 8af361e7467..f600857323d 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/VaadinServletService.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/VaadinServletService.java @@ -38,6 +38,7 @@ import com.vaadin.flow.server.communication.FaviconHandler; import com.vaadin.flow.server.communication.IndexHtmlRequestHandler; import com.vaadin.flow.server.communication.PushRequestHandler; +import com.vaadin.flow.server.communication.SseRequestHandler; import com.vaadin.flow.server.communication.WebComponentProvider; import com.vaadin.flow.server.startup.ApplicationRouteRegistry; import com.vaadin.flow.shared.ApplicationConstants; @@ -138,6 +139,9 @@ protected List createRequestHandlers() } } + // SSE push handler - always available, no external dependencies + handlers.add(new SseRequestHandler(this)); + addBootstrapHandler(handlers); return handlers; } diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/JavaScriptBootstrapHandler.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/JavaScriptBootstrapHandler.java index 29ebe91dbfc..d549407b608 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/JavaScriptBootstrapHandler.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/JavaScriptBootstrapHandler.java @@ -48,6 +48,7 @@ import com.vaadin.flow.server.VaadinServletRequest; import com.vaadin.flow.server.VaadinSession; import com.vaadin.flow.shared.ApplicationConstants; +import com.vaadin.flow.shared.ui.Transport; /** * Processes a 'start' request type from the client to initialize server session @@ -323,7 +324,15 @@ protected ObjectNode getInitialJson(VaadinRequest request, initial.set("appConfig", appConfig); if (context.getPushMode().isEnabled()) { - initial.put("pushScript", getPushScript(context)); + // Check if SSE transport is configured - if so, use embedded SSE + // push + Transport transport = context.getUI().getPushConfiguration() + .getTransport(); + if (transport == Transport.SSE) { + initial.put("useSsePush", true); + } else { + initial.put("pushScript", getPushScript(context)); + } } if (!session.getConfiguration().isProductionMode()) { initial.set("stats", getStats()); diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/SsePushConnection.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/SsePushConnection.java new file mode 100644 index 00000000000..c23b107ca5a --- /dev/null +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/SsePushConnection.java @@ -0,0 +1,335 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.server.communication; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.PrintWriter; +import java.io.Serializable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tools.jackson.databind.JsonNode; + +import com.vaadin.flow.component.UI; +import com.vaadin.flow.internal.UsageStatistics; + +/** + * A {@link PushConnection} implementation using Server-Sent Events (SSE) + * without the Atmosphere framework. + *

+ * SSE is used for server-to-client communication while XHR is used for + * client-to-server messages (similar to the WEBSOCKET_XHR transport pattern). + *

+ * For internal use only. May be renamed or removed in a future release. + * + * @author Vaadin Ltd + * @since 24.7 + */ +public class SsePushConnection implements PushConnection, Serializable { + + /** + * Connection states for SSE push. + */ + protected enum State { + /** + * Not connected. Trying to push will set the connection state to + * PUSH_PENDING or RESPONSE_PENDING and defer sending the message until + * a connection is established. + */ + DISCONNECTED, + + /** + * Not connected. An asynchronous push is pending the opening of the + * connection. + */ + PUSH_PENDING, + + /** + * Not connected. A response to a client request is pending the opening + * of the connection. + */ + RESPONSE_PENDING, + + /** + * Connected. Messages can be sent through the connection. + */ + CONNECTED + } + + private static final String SSE_VERSION = "1.0"; + + private UI ui; + private transient State state = State.DISCONNECTED; + private transient AsyncContext asyncContext; + private transient PrintWriter writer; + private transient Object lock = new Object(); + private volatile boolean disconnecting; + + /** + * Creates an instance connected to the given UI. + * + * @param ui + * the UI to which this connection belongs + */ + public SsePushConnection(UI ui) { + this.ui = ui; + UsageStatistics.markAsUsed("flow/SsePushConnection", SSE_VERSION); + } + + @Override + public void push() { + push(true); + } + + /** + * Pushes pending state changes and client RPC calls to the client. If + * {@code isConnected()} is false, defers the push until a connection is + * established. + * + * @param async + * True if this push asynchronously originates from the server, + * false if it is a response to a client request. + */ + public void push(boolean async) { + if (disconnecting || !isConnected()) { + if (disconnecting) { + getLogger().debug( + "Disconnection in progress, ignoring push request"); + } + if (async && state != State.RESPONSE_PENDING) { + state = State.PUSH_PENDING; + } else { + state = State.RESPONSE_PENDING; + } + } else { + synchronized (lock) { + try { + JsonNode response = new UidlWriter().createUidl(getUI(), + async); + sendMessage("for(;;);[" + response + "]"); + } catch (Exception e) { + throw new RuntimeException("Push failed", e); + } + } + } + } + + /** + * Sends the given message to the current client using SSE format. Cannot be + * called if {@link #isConnected()} returns false. + * + * @param message + * The message to send + */ + protected void sendMessage(String message) { + assert isConnected() : "Cannot send message when not connected"; + + synchronized (lock) { + try { + // SSE format: event type, id, and data fields + writer.write("event: uidl\n"); + writer.write("id: " + (ui.getInternals().getServerSyncId() - 1) + + "\n"); + // Data can span multiple lines - each line needs "data: " + // prefix + String[] lines = message.split("\n", -1); + for (String line : lines) { + writer.write("data: " + line + "\n"); + } + writer.write("\n"); // Empty line ends the event + writer.flush(); + + if (writer.checkError()) { + getLogger() + .debug("Error detected while writing SSE message"); + connectionLost(); + } + } catch (Exception e) { + getLogger().debug("Failed to send SSE message", e); + connectionLost(); + } + } + } + + @Override + public boolean isConnected() { + assert state != null; + return state == State.CONNECTED; + } + + /** + * Associates this {@code SsePushConnection} with the given + * {@link AsyncContext} representing an established SSE connection. If + * already connected, calls {@link #disconnect()} first. If there is a + * deferred push, carries it out via the new connection. + * + * @param asyncContext + * the async context to associate this connection with + * @throws IOException + * if setting up the SSE connection fails + */ + public void connect(AsyncContext asyncContext) throws IOException { + assert asyncContext != null; + + if (isConnected()) { + disconnect(); + } + + this.asyncContext = asyncContext; + + // Set up SSE response headers + HttpServletResponse response = (HttpServletResponse) asyncContext + .getResponse(); + response.setContentType("text/event-stream"); + response.setCharacterEncoding("UTF-8"); + // Prevent caching + response.setHeader("Cache-Control", + "no-cache, no-store, must-revalidate"); + response.setHeader("Pragma", "no-cache"); + // Keep connection alive + response.setHeader("Connection", "keep-alive"); + // Disable buffering for immediate delivery + response.setHeader("X-Accel-Buffering", "no"); + + this.writer = response.getWriter(); + + State oldState = state; + state = State.CONNECTED; + + // Send initial connection event + sendConnectionEstablishedEvent(); + + if (oldState == State.PUSH_PENDING + || oldState == State.RESPONSE_PENDING) { + // Sending a "response" message (async=false) also takes care of a + // pending push, but not vice versa + push(oldState == State.PUSH_PENDING); + } + } + + /** + * Sends an event to the client indicating that the SSE connection has been + * established. + */ + private void sendConnectionEstablishedEvent() { + try { + writer.write("event: connected\n"); + writer.write("data: {\"uiId\":" + ui.getUIId() + "}\n\n"); + writer.flush(); + } catch (Exception e) { + getLogger().debug("Failed to send connection established event", e); + } + } + + /** + * Gets the UI associated with this connection. + * + * @return the UI associated with this connection. + */ + protected UI getUI() { + return ui; + } + + /** + * Gets the AsyncContext associated with this connection. + * + * @return The AsyncContext associated with this connection or null if + * connection not open. + */ + protected AsyncContext getAsyncContext() { + return asyncContext; + } + + @Override + public void disconnect() { + // If a disconnection is already happening on another thread it is safe + // to skip the operation + if (disconnecting) { + getLogger().debug( + "Disconnection already in progress, ignoring request"); + return; + } + + synchronized (lock) { + if (!isConnected() || asyncContext == null) { + getLogger().debug( + "Disconnection already happened, ignoring request"); + return; + } + try { + disconnecting = true; + + // Complete the async context to close the connection + try { + asyncContext.complete(); + } catch (Exception e) { + getLogger().debug("Error when completing async context", e); + } + connectionLost(); + } finally { + disconnecting = false; + } + } + } + + /** + * Called when the connection to the client has been lost. + */ + public void connectionLost() { + asyncContext = null; + writer = null; + if (state == State.CONNECTED) { + state = State.DISCONNECTED; + } + } + + /** + * Returns the state of this connection. + * + * @return the state of this connection + */ + protected State getState() { + return state; + } + + /** + * Reinitializes this PushConnection after deserialization. The connection + * is initially in disconnected state; the client will handle the + * reconnecting. + * + * @param stream + * the object to read + * @throws IOException + * if an IO error occurred + * @throws ClassNotFoundException + * if the class of the stream object could not be found + */ + private void readObject(ObjectInputStream stream) + throws IOException, ClassNotFoundException { + stream.defaultReadObject(); + state = State.DISCONNECTED; + disconnecting = false; + lock = new Object(); + } + + private static Logger getLogger() { + return LoggerFactory.getLogger(SsePushConnection.class.getName()); + } +} diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/SseRequestHandler.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/SseRequestHandler.java new file mode 100644 index 00000000000..c84e4ea181f --- /dev/null +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/SseRequestHandler.java @@ -0,0 +1,286 @@ +/* + * Copyright 2000-2025 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.vaadin.flow.server.communication; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.vaadin.flow.component.UI; +import com.vaadin.flow.server.HandlerHelper; +import com.vaadin.flow.server.HandlerHelper.RequestType; +import com.vaadin.flow.server.RequestHandler; +import com.vaadin.flow.server.SessionExpiredHandler; +import com.vaadin.flow.server.VaadinRequest; +import com.vaadin.flow.server.VaadinResponse; +import com.vaadin.flow.server.VaadinService; +import com.vaadin.flow.server.VaadinServletRequest; +import com.vaadin.flow.server.VaadinServletResponse; +import com.vaadin.flow.server.VaadinServletService; +import com.vaadin.flow.server.VaadinSession; +import com.vaadin.flow.shared.ApplicationConstants; +import com.vaadin.flow.shared.JsonConstants; + +/** + * Handles requests to open an SSE (Server-Sent Events) push connection between + * the client and the server. SSE is a unidirectional server-to-client channel; + * client-to-server messages continue to use XHR. + *

+ * This handler does not use the Atmosphere framework. + *

+ * For internal use only. May be renamed or removed in a future release. + * + * @author Vaadin Ltd + * @since 24.7 + */ +public class SseRequestHandler + implements RequestHandler, SessionExpiredHandler { + + private final VaadinServletService service; + + /** + * Creates an instance connected to the given service. + * + * @param service + * the service this handler belongs to + */ + public SseRequestHandler(VaadinServletService service) { + this.service = service; + service.addServiceDestroyListener(event -> destroy()); + } + + @Override + public boolean handleRequest(VaadinSession session, VaadinRequest request, + VaadinResponse response) throws IOException { + + if (!HandlerHelper.isRequestType(request, RequestType.SSE)) { + return false; + } + + if (!(request instanceof VaadinServletRequest) + || !(response instanceof VaadinServletResponse)) { + throw new IllegalArgumentException( + "Only VaadinServletRequest/Response are supported for SSE"); + } + + HttpServletRequest servletRequest = (HttpServletRequest) ((VaadinServletRequest) request) + .getRequest(); + HttpServletResponse servletResponse = (HttpServletResponse) ((VaadinServletResponse) response) + .getResponse(); + + // Check if async is supported + if (!servletRequest.isAsyncSupported()) { + getLogger().error("Async not supported. SSE push unavailable."); + servletResponse.sendError( + HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + "Async not supported. SSE push unavailable."); + return true; + } + + // Validate session + if (session == null) { + getLogger().debug("Session expired, sending session expired event"); + sendSessionExpired(servletResponse); + return true; + } + + // Find UI + UI ui; + session.lock(); + try { + ui = service.findUI(request); + if (ui == null) { + getLogger().warn("UI not found for SSE request"); + sendError(servletResponse, "UI not found"); + return true; + } + + // Validate push ID (CSRF protection) + String requestPushId = request + .getParameter(ApplicationConstants.PUSH_ID_PARAMETER); + if (!isPushIdValid(session, requestPushId)) { + getLogger().warn("Invalid push ID received from {}", + servletRequest.getRemoteHost()); + sendRefresh(servletResponse); + return true; + } + + // Get the SSE push connection + PushConnection pushConnection = ui.getInternals() + .getPushConnection(); + if (!(pushConnection instanceof SsePushConnection)) { + getLogger().warn( + "SSE request received but UI does not have SsePushConnection. " + + "Connection type: {}", + pushConnection != null + ? pushConnection.getClass().getName() + : "null"); + sendError(servletResponse, "SSE not configured for this UI"); + return true; + } + + SsePushConnection sseConnection = (SsePushConnection) pushConnection; + + // Start async context + AsyncContext asyncContext = servletRequest.startAsync(); + asyncContext.setTimeout(0); // No timeout for SSE + + // Set up async listener for connection lifecycle + asyncContext.addListener(new AsyncListener() { + @Override + public void onComplete(AsyncEvent event) { + handleDisconnect(sseConnection); + } + + @Override + public void onTimeout(AsyncEvent event) { + getLogger().debug("SSE connection timed out"); + handleDisconnect(sseConnection); + } + + @Override + public void onError(AsyncEvent event) { + getLogger().debug("SSE connection error", + event.getThrowable()); + handleDisconnect(sseConnection); + } + + @Override + public void onStartAsync(AsyncEvent event) { + // Re-register listener if async restarted + } + }); + + // Connect the SSE push connection + sseConnection.connect(asyncContext); + + getLogger().debug("SSE connection established for UI {}", + ui.getUIId()); + + } finally { + session.unlock(); + } + + return true; + } + + /** + * Handles disconnection of an SSE connection. + * + * @param connection + * the connection that was disconnected + */ + private void handleDisconnect(SsePushConnection connection) { + getLogger().debug("SSE connection disconnected"); + connection.connectionLost(); + } + + /** + * Checks whether a given push id matches the session's push id. The + * comparison is done using a time-constant method since the push id is used + * to protect against cross-site attacks. + * + * @param session + * the vaadin session for which the check should be done + * @param requestPushId + * the push id provided in the request + * @return {@code true} if the id is valid, {@code false} otherwise + */ + private static boolean isPushIdValid(VaadinSession session, + String requestPushId) { + String sessionPushId = session.getPushId(); + if (requestPushId == null || !MessageDigest.isEqual( + requestPushId.getBytes(StandardCharsets.UTF_8), + sessionPushId.getBytes(StandardCharsets.UTF_8))) { + return false; + } + return true; + } + + /** + * Sends a session expired event to the client. + * + * @param response + * the response to write to + * @throws IOException + * if writing fails + */ + private void sendSessionExpired(HttpServletResponse response) + throws IOException { + response.setContentType(JsonConstants.JSON_CONTENT_TYPE); + response.getWriter() + .write(VaadinService.createSessionExpiredJSON(true)); + } + + /** + * Sends a refresh command to the client. + * + * @param response + * the response to write to + * @throws IOException + * if writing fails + */ + private void sendRefresh(HttpServletResponse response) throws IOException { + response.setContentType(JsonConstants.JSON_CONTENT_TYPE); + response.getWriter().write(VaadinService + .createCriticalNotificationJSON(null, null, null, null)); + } + + /** + * Sends an error message to the client. + * + * @param response + * the response to write to + * @param message + * the error message + * @throws IOException + * if writing fails + */ + private void sendError(HttpServletResponse response, String message) + throws IOException { + response.sendError(HttpServletResponse.SC_BAD_REQUEST, message); + } + + @Override + public boolean handleSessionExpired(VaadinRequest request, + VaadinResponse response) throws IOException { + // Handle session expired for SSE requests + if (!HandlerHelper.isRequestType(request, RequestType.SSE)) { + return false; + } + return handleRequest(null, request, response); + } + + /** + * Frees any resources currently in use. + */ + public void destroy() { + // No resources to free for SSE (no Atmosphere framework) + } + + private static Logger getLogger() { + return LoggerFactory.getLogger(SseRequestHandler.class.getName()); + } +} diff --git a/flow-server/src/main/java/com/vaadin/flow/shared/ApplicationConstants.java b/flow-server/src/main/java/com/vaadin/flow/shared/ApplicationConstants.java index 766e5925dda..9128895325b 100644 --- a/flow-server/src/main/java/com/vaadin/flow/shared/ApplicationConstants.java +++ b/flow-server/src/main/java/com/vaadin/flow/shared/ApplicationConstants.java @@ -89,6 +89,19 @@ public class ApplicationConstants implements Serializable { public static final String VAADIN_PUSH_DEBUG_JS = VAADIN_STATIC_FILES_PATH + "push/vaadinPush.js"; + /** + * The name of the javascript containing SSE push support. Used only for + * server-rendered HTML pages; JavaScript bootstrap uses embedded SSE push. + */ + public static final String VAADIN_SSE_PUSH_JS = VAADIN_STATIC_FILES_PATH + + "push/vaadinSsePush.js"; + + /** + * The name of the debug version of the javascript containing SSE push + * support. Same as production version since SSE push is bundled. + */ + public static final String VAADIN_SSE_PUSH_DEBUG_JS = VAADIN_SSE_PUSH_JS; + /** * Name of the parameter used to transmit the push connection identifier. */ @@ -194,6 +207,11 @@ public class ApplicationConstants implements Serializable { */ public static final String REQUEST_TYPE_PUSH = "push"; + /** + * Request type parameter value indicating an SSE push request. + */ + public static final String REQUEST_TYPE_SSE = "sse"; + /** * Request type parameter value indicating a WebComponent resynchronization * request. diff --git a/flow-server/src/main/java/com/vaadin/flow/shared/ui/Transport.java b/flow-server/src/main/java/com/vaadin/flow/shared/ui/Transport.java index 30d2b47de11..a9a114454dd 100644 --- a/flow-server/src/main/java/com/vaadin/flow/shared/ui/Transport.java +++ b/flow-server/src/main/java/com/vaadin/flow/shared/ui/Transport.java @@ -34,7 +34,12 @@ public enum Transport { /** * HTTP long polling. */ - LONG_POLLING("long-polling"); + LONG_POLLING("long-polling"), + /** + * Server-Sent Events (SSE) for server to client, XHR for client to server. + * Does not use the Atmosphere framework. + */ + SSE("sse"); private String identifier;