forked from confluentinc/confluent-kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
confluent_cloud.py
executable file
·145 lines (123 loc) · 4.93 KB
/
confluent_cloud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is a simple example demonstrating how to produce a message to
# Confluent Cloud then read it back again.
#
# https://www.confluent.io/confluent-cloud/
#
# Auto-creation of topics is disabled in Confluent Cloud. You will need to
# use the ccloud cli to create the python-test-topic topic before running this
# example.
#
# $ ccloud topic create python-test-topic
#
# The <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters
# are available via the Confluent Cloud web interface. For more information,
# refer to the quick-start:
#
# https://docs.confluent.io/current/cloud-quickstart.html
#
# to execute using Python 2.7:
# $ virtualenv ccloud_example
# $ source ccloud_example/bin/activate
# $ pip install confluent_kafka
# $ python confluent_cloud.py
# $ deactivate
#
# to execute using Python 3.x:
# $ python -m venv ccloud_example
# $ source ccloud_example/bin/activate
# $ pip install confluent_kafka
# $ python confluent_cloud.py
# $ deactivate
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
def error_cb(err):
""" The error callback is used for generic client errors. These
errors are generally to be considered informational as the client will
automatically try to recover from all errors, and no extra action
is typically required by the application.
For this example however, we terminate the application if the client
is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
authentication errors (_AUTHENTICATION). """
print("Client error: {}".format(err))
if err.code() == KafkaError._ALL_BROKERS_DOWN or \
err.code() == KafkaError._AUTHENTICATION:
# Any exception raised from this callback will be re-raised from the
# triggering flush() or poll() call.
raise KafkaException(err)
# Create producer
p = Producer({
'bootstrap.servers': '<ccloud bootstrap servers>',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '<ccloud key>',
'sasl.password': '<ccloud secret>',
'error_cb': error_cb,
})
def acked(err, msg):
"""Delivery report callback called (from flush()) on successful or failed delivery of the message."""
if err is not None:
print('Failed to deliver message: {}'.format(err.str()))
else:
print('Produced to: {} [{}] @ {}'.format(msg.topic(), msg.partition(), msg.offset()))
for n in range(0, 10):
# Produce message: this is an asynchronous operation.
# Upon successful or permanently failed delivery to the broker the
# callback will be called to propagate the produce result.
# The delivery callback is triggered from poll() or flush().
# For long running
# produce loops it is recommended to call poll() to serve these
# delivery report callbacks.
p.produce('python-test-topic', value='python test value nr {}'.format(n),
callback=acked)
# Trigger delivery report callbacks from previous produce calls.
p.poll(0)
# flush() is typically called when the producer is done sending messages to wait
# for outstanding messages to be transmitted to the broker and delivery report
# callbacks to get called. For continous producing you should call p.poll(0)
# after each produce() call to trigger delivery report callbacks.
p.flush(10)
# Create consumer
c = Consumer({
'bootstrap.servers': '<ccloud bootstrap servers>',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': '<ccloud key>',
'sasl.password': '<ccloud secret>',
'group.id': str(uuid.uuid1()), # this will create a new consumer group on each invocation.
'auto.offset.reset': 'earliest',
'error_cb': error_cb,
})
c.subscribe(['python-test-topic'])
try:
while True:
msg = c.poll(0.1) # Wait for message or event/error
if msg is None:
# No message available within timeout.
# Initial message consumption may take up to `session.timeout.ms` for
# the group to rebalance and start consuming.
continue
if msg.error():
# Errors are typically temporary, print error and continue.
print('Consumer error: {}'.format(msg.error()))
continue
print('Consumed: {}'.format(msg.value()))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
c.close()