Skip to content

Commit 82b38a9

Browse files
authored
feat: Added SASL/OAUTHBEARER extensions support. (#180)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent e30e7cd commit 82b38a9

File tree

9 files changed

+89
-44
lines changed

9 files changed

+89
-44
lines changed

docs/base.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,11 @@ const producer = new Producer({
119119
serializers: stringSerializers,
120120
sasl: {
121121
mechanism: 'PLAIN', // Also SCRAM-SHA-256, SCRAM-SHA-512 and OAUTHBEARER are supported
122-
// username, password or token can also be (async) functions returning a string
122+
// username, password, token and oauthBearerExtensions can also be (async) functions returning a value
123123
username: 'username', // This is used from PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
124124
password: 'password', // This is used from PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512
125125
token: 'token', // This is used from OAUTHBEARER
126+
oauthBearerExtensions: {}, // This is used from OAUTHBEARER to add extension according to RFC 7628
126127
// This is needed if your Kafka server returns a exitCode 0 when invalid credentials are sent and only stores
127128
// authentication information in auth bytes.
128129
//

src/clients/base/options.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ export const baseOptionsSchema = {
4646
username: { oneOf: [{ type: 'string' }, { function: true }] },
4747
password: { oneOf: [{ type: 'string' }, { function: true }] },
4848
token: { oneOf: [{ type: 'string' }, { function: true }] },
49+
oauthBearerExtensions: {
50+
oneOf: [{ type: 'object', patternProperties: { '.+': { type: 'string' } } }, { function: true }]
51+
},
4952
authBytesValidator: { function: true }
5053
},
5154
required: ['mechanism'],

src/network/connection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import { defaultCrypto, type ScramAlgorithm } from '../protocol/sasl/scram-sha.t
3232
import { Writer } from '../protocol/writer.ts'
3333
import { loggers } from '../utils.ts'
3434

35-
export type SASLCredentialProvider = () => string | Promise<string>
35+
export type SASLCredentialProvider<T = string> = () => T | Promise<T>
3636
export interface Broker {
3737
host: string
3838
port: number
@@ -44,6 +44,7 @@ export interface SASLOptions {
4444
password?: string | SASLCredentialProvider
4545
token?: string | SASLCredentialProvider
4646
authBytesValidator?: (authBytes: Buffer, callback: CallbackWithPromise<Buffer>) => void
47+
oauthBearerExtensions?: Record<string, string> | SASLCredentialProvider<Record<string, string>>
4748
}
4849

4950
export interface ConnectionOptions {
@@ -384,7 +385,7 @@ export class Connection extends EventEmitter {
384385
this.#status = ConnectionStatuses.AUTHENTICATING
385386
}
386387

387-
const { mechanism, username, password, token } = this.#options.sasl!
388+
const { mechanism, username, password, token, oauthBearerExtensions } = this.#options.sasl!
388389

389390
if (!allowedSASLMechanisms.includes(mechanism)) {
390391
this.#onConnectionError(
@@ -414,7 +415,7 @@ export class Connection extends EventEmitter {
414415
if (mechanism === SASLMechanisms.PLAIN) {
415416
saslPlain.authenticate(saslAuthenticateV2.api, this, username!, password!, callback)
416417
} else if (mechanism === SASLMechanisms.OAUTHBEARER) {
417-
saslOAuthBearer.authenticate(saslAuthenticateV2.api, this, token!, callback)
418+
saslOAuthBearer.authenticate(saslAuthenticateV2.api, this, token!, oauthBearerExtensions!, callback)
418419
} else {
419420
saslScramSha.authenticate(
420421
saslAuthenticateV2.api,

src/protocol/sasl/credential-provider.ts

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,54 @@ import { type Callback } from '../../apis/index.ts'
22
import { AuthenticationError } from '../../errors.ts'
33
import { type SASLCredentialProvider } from '../../network/connection.ts'
44

5-
export function getCredential (
5+
export function getCredential<T> (
66
label: string,
7-
credentialOrProvider: string | SASLCredentialProvider,
8-
callback: Callback<string>
7+
credentialOrProvider: T | SASLCredentialProvider<T>,
8+
callback: Callback<T>
99
): void {
10-
if (typeof credentialOrProvider === 'string') {
11-
callback(null, credentialOrProvider)
10+
if (typeof credentialOrProvider === 'undefined') {
11+
callback(new AuthenticationError(`The ${label} should be a value or a function.`), undefined as unknown as T)
1212
return
1313
} else if (typeof credentialOrProvider !== 'function') {
14-
callback(new AuthenticationError(`The ${label} should be a string or a function.`), undefined as unknown as string)
14+
callback(null, credentialOrProvider)
1515
return
1616
}
1717

1818
try {
19-
const credential = credentialOrProvider()
19+
const credential = (credentialOrProvider as SASLCredentialProvider<T>)()
2020

21-
if (typeof credential === 'string') {
22-
callback(null, credential)
23-
return
24-
} else if (typeof (credential as Promise<string>)?.then !== 'function') {
21+
if (credential == null) {
2522
callback(
26-
new AuthenticationError(`The ${label} provider should return a string or a promise that resolves to a string.`),
27-
undefined as unknown as string
23+
new AuthenticationError(`The ${label} provider should return a string or a promise that resolves to a value.`),
24+
undefined as unknown as T
2825
)
29-
26+
return
27+
} else if (typeof (credential as Promise<string>)?.then !== 'function') {
28+
callback(null, credential as T)
3029
return
3130
}
3231

33-
credential
34-
.then(token => {
35-
if (typeof token !== 'string') {
32+
;(credential as Promise<T>)
33+
.then((result: T) => {
34+
if (result == null) {
3635
process.nextTick(
3736
callback,
38-
new AuthenticationError(`The ${label} provider should resolve to a string.`),
37+
new AuthenticationError(`The ${label} provider should resolve to a value.`),
3938
undefined as unknown as string
4039
)
4140

4241
return
4342
}
4443

45-
process.nextTick(callback, null, token)
44+
process.nextTick(callback, null, result)
4645
})
47-
.catch(error => {
48-
process.nextTick(
49-
callback,
50-
new AuthenticationError(`The ${label} provider threw an error.`, { cause: error as Error })
51-
)
46+
.catch((error: Error) => {
47+
process.nextTick(callback, new AuthenticationError(`The ${label} provider threw an error.`, { cause: error }))
5248
})
5349
} catch (error) {
5450
callback(
5551
new AuthenticationError(`The ${label} provider threw an error.`, { cause: error as Error }),
56-
undefined as unknown as string
52+
undefined as unknown as T
5753
)
5854
}
5955
}

src/protocol/sasl/oauth-bearer.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,20 @@ export function authenticate (
3333
authenticateAPI: SASLAuthenticationAPI,
3434
connection: Connection,
3535
tokenOrProvider: string | SASLCredentialProvider,
36+
extensions: Record<string, string> | SASLCredentialProvider<Record<string, string>>,
3637
callback: CallbackWithPromise<SaslAuthenticateResponse>
3738
): void
3839
export function authenticate (
3940
authenticateAPI: SASLAuthenticationAPI,
4041
connection: Connection,
41-
tokenOrProvider: string | SASLCredentialProvider
42+
tokenOrProvider: string | SASLCredentialProvider,
43+
extensions: Record<string, string> | SASLCredentialProvider<Record<string, string>>
4244
): Promise<SaslAuthenticateResponse>
4345
export function authenticate (
4446
authenticateAPI: SASLAuthenticationAPI,
4547
connection: Connection,
4648
tokenOrProvider: string | SASLCredentialProvider,
49+
extensionsOrProvider: Record<string, string> | SASLCredentialProvider<Record<string, string>>,
4750
callback?: CallbackWithPromise<SaslAuthenticateResponse>
4851
): void | Promise<SaslAuthenticateResponse> {
4952
if (!callback) {
@@ -55,7 +58,20 @@ export function authenticate (
5558
return callback!(error, undefined as unknown as SaslAuthenticateResponse)
5659
}
5760

58-
authenticateAPI(connection, Buffer.from(`n,,\x01auth=Bearer ${token}\x01\x01`), callback!)
61+
getCredential('SASL/OAUTHBEARER extensions', extensionsOrProvider ?? {}, (error, extensionsMap) => {
62+
if (error) {
63+
return callback!(error, undefined as unknown as SaslAuthenticateResponse)
64+
}
65+
66+
let extensions = ''
67+
if (extensionsMap) {
68+
for (const [key, value] of Object.entries(extensionsMap)) {
69+
extensions += `\x01${key}=${value}`
70+
}
71+
}
72+
73+
authenticateAPI(connection, Buffer.from(`n,,\x01auth=Bearer ${token}${extensions}\x01\x01`), callback!)
74+
})
5975
})
6076

6177
return callback[kCallbackPromise]

test/clients/base/sasl-oauthbearer.test.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,31 @@ test('should connect to SASL protected broker using SASL/OAUTHBEARER', async t =
5353
deepStrictEqual(metadata.brokers.get(1), saslBroker)
5454
})
5555

56+
test('should connect to SASL protected broker using SASL/OAUTHBEARER and custom extensions', async t => {
57+
const signSync = createSigner({
58+
algorithm: 'none',
59+
iss: 'kafka',
60+
aud: ['users'],
61+
sub: 'admin',
62+
expiresIn: '2h'
63+
})
64+
const token = signSync({ scope: 'test' })
65+
66+
const base = new Base({
67+
clientId: 'clientId',
68+
bootstrapBrokers: kafkaSaslBootstrapServers,
69+
strict: true,
70+
retries: 0,
71+
sasl: { mechanism: SASLMechanisms.OAUTHBEARER, token, oauthBearerExtensions: { aaa: 'bbb', ccc: 'ddd' } }
72+
})
73+
74+
t.after(() => base.close())
75+
76+
const metadata = await base.metadata({ topics: [] })
77+
78+
deepStrictEqual(metadata.brokers.get(1), saslBroker)
79+
})
80+
5681
test('should handle authentication errors', async t => {
5782
const base = new Base({
5883
clientId: 'clientId',
@@ -178,7 +203,8 @@ test('should handle async credential provider errors', async t => {
178203
retries: 0,
179204
sasl: {
180205
mechanism: 'OAUTHBEARER',
181-
async token () {
206+
token: 'token',
207+
async oauthBearerExtensions () {
182208
throw new Error('Kaboom!')
183209
}
184210
}
@@ -198,7 +224,7 @@ test('should handle async credential provider errors', async t => {
198224

199225
const authenticationError = networkError.cause
200226
deepStrictEqual(authenticationError instanceof AuthenticationError, true)
201-
deepStrictEqual(authenticationError.message, 'The SASL/OAUTHBEARER token provider threw an error.')
227+
deepStrictEqual(authenticationError.message, 'The SASL/OAUTHBEARER extensions provider threw an error.')
202228
deepStrictEqual(authenticationError.cause.message, 'Kaboom!')
203229
}
204230
})

test/clients/consumer/consumer.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3375,7 +3375,7 @@ test('#heartbeat should emit events when it was cancelled while waiting for API
33753375

33763376
consumer.on('consumer:heartbeat:start', () => {
33773377
mockMetadata(consumer, 1, null, null, (original, options, callback) => {
3378-
consumer.leaveGroup()
3378+
consumer.leaveGroup(false, () => {})
33793379
original(options, callback)
33803380
})
33813381
})
@@ -3389,7 +3389,7 @@ test('#heartbeat should emit events when it was cancelled while waiting for Hear
33893389
const consumer = createConsumer(t)
33903390

33913391
mockAPI(consumer[kConnections], heartbeatV4.api.key, null, null, (original: Function, ...args: any[]) => {
3392-
consumer.leaveGroup()
3392+
consumer.leaveGroup(false, () => {})
33933393
original(...args)
33943394
})
33953395

test/protocol/sasl/credential-provider.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ test('getCredential with string credential', (_, done) => {
1414
})
1515

1616
test('getCredential with invalid credential type', (_, done) => {
17-
const credential = 123 as any
17+
const credential = undefined as any
1818

1919
getCredential('username', credential, error => {
2020
const authenticationError = error as AuthenticationError
2121
deepStrictEqual(authenticationError instanceof AuthenticationError, true)
22-
deepStrictEqual(authenticationError.message, 'The username should be a string or a function.')
22+
deepStrictEqual(authenticationError.message, 'The username should be a value or a function.')
2323
deepStrictEqual(authenticationError.code, 'PLT_KFK_AUTHENTICATION')
2424
done()
2525
})
@@ -47,30 +47,30 @@ test('getCredential with function provider returning promise', (_, done) => {
4747
})
4848
})
4949

50-
test('getCredential with function provider returning non-string', (_, done) => {
51-
const provider = () => 123 as any
50+
test('getCredential with function provider returning non-value', (_, done) => {
51+
const provider = () => undefined
5252

5353
getCredential('username', provider, (error: Error | null) => {
5454
const authenticationError = error as AuthenticationError
5555

5656
deepStrictEqual(authenticationError instanceof AuthenticationError, true)
5757
deepStrictEqual(
5858
authenticationError.message,
59-
'The username provider should return a string or a promise that resolves to a string.'
59+
'The username provider should return a string or a promise that resolves to a value.'
6060
)
6161
deepStrictEqual(authenticationError.code, 'PLT_KFK_AUTHENTICATION')
6262
done()
6363
})
6464
})
6565

66-
test('getCredential with promise provider resolving to non-string', (_, done) => {
67-
const provider = () => Promise.resolve(123 as any)
66+
test('getCredential with promise provider resolving to non-value', (_, done) => {
67+
const provider = () => Promise.resolve(null as any)
6868

6969
getCredential('password', provider, (error: Error | null) => {
7070
const authenticationError = error as AuthenticationError
7171

7272
deepStrictEqual(authenticationError instanceof AuthenticationError, true)
73-
deepStrictEqual(authenticationError.message, 'The password provider should resolve to a string.')
73+
deepStrictEqual(authenticationError.message, 'The password provider should resolve to a value.')
7474
deepStrictEqual(authenticationError.code, 'PLT_KFK_AUTHENTICATION')
7575
done()
7676
})

test/protocol/sasl/oauthbearer.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ test('authenticate should create proper payload with the token - promise', async
3636
const result = await saslOAuthBearer.authenticate(
3737
api as unknown as saslAuthenticateV2.SASLAuthenticationAPI,
3838
mockConnection as any,
39-
'token'
39+
'token',
40+
{}
4041
)
4142

4243
// Verify the function was called
@@ -78,6 +79,7 @@ test('authenticate should create proper payload with the token - callback', (_,
7879
api as unknown as saslAuthenticateV2.SASLAuthenticationAPI,
7980
mockConnection as any,
8081
'token',
82+
{},
8183
(error, result) => {
8284
ifError(error)
8385

0 commit comments

Comments
 (0)