-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.eventHub.js
48 lines (42 loc) · 1.48 KB
/
app.eventHub.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
* Microsoft Sample Code - Copyright (c) 2020 - Licensed MIT
*/
const { EventHubConsumerClient } = require('@azure/event-hubs')
const { convertIotHubToEventHubsConnectionString } = require('./app.eventHub.connectionString')
class EventHubReader {
constructor (iotHubConnectionString, consumerGroup) {
this.iotHubConnectionString = iotHubConnectionString
this.consumerGroup = consumerGroup
}
async startReadMessage (startReadMessageCallback) {
try {
const eventHubConnectionString = await convertIotHubToEventHubsConnectionString(this.iotHubConnectionString)
const consumerClient = new EventHubConsumerClient(this.consumerGroup, eventHubConnectionString)
consumerClient.subscribe({
processEvents: (events, context) => {
for (let i = 0; i < events.length; ++i) {
startReadMessageCallback(
events[i].body,
events[i].enqueuedTimeUtc,
events[i].systemProperties['iothub-connection-device-id'])
}
},
processError: (err, context) => {
console.error(err.message || err)
}
})
} catch (ex) {
console.error(ex.message || ex)
}
}
// Close connection to Event Hub.
async stopReadMessage () {
const disposeHandlers = []
this.receiveHandlers.forEach((receiveHandler) => {
disposeHandlers.push(receiveHandler.stop())
})
await Promise.all(disposeHandlers)
this.consumerClient.close()
}
}
module.exports = EventHubReader