-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.py
98 lines (81 loc) · 2.84 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import datetime
import time
import json
import typing
import logging
import dataclasses
from faker import Faker
from kafka import KafkaProducer
logging.basicConfig(level=logging.INFO)
@dataclasses.dataclass
class OrderItem:
product_id: int
quantity: int
@dataclasses.dataclass
class Order:
order_id: str
ordered_at: datetime.datetime
user_id: str
order_items: typing.List[OrderItem]
def asdict(self):
return dataclasses.asdict(self)
@classmethod
def auto(cls, fake: Faker = Faker()):
user_id = str(fake.random_int(1, 100)).zfill(3)
order_items = [
OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
for _ in range(fake.random_int(1, 4))
]
return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
def create(self, num: int):
return [self.auto() for _ in range(num)]
class Producer:
def __init__(self, bootstrap_servers: list, topic: str):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.producer = self.create()
def create(self):
return KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
security_protocol="SASL_SSL",
ssl_check_hostname=False,
ssl_cafile="pem/ca-root.pem",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username=os.environ["SASL_USERNAME"],
sasl_plain_password=os.environ["SASL_PASSWORD"],
value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
)
def send(self, orders: typing.List[Order]):
for order in orders:
try:
self.producer.send(
self.topic, key={"order_id": order.order_id}, value=order.asdict()
)
except Exception as e:
raise RuntimeError("fails to send a message") from e
self.producer.flush()
def serialize(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, datetime.date):
return str(obj)
return obj
if __name__ == "__main__":
producer = Producer(
bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
topic=os.getenv("TOPIC_NAME", "orders"),
)
max_run = int(os.getenv("MAX_RUN", "-1"))
logging.info(f"max run - {max_run}")
current_run = 0
while True:
current_run += 1
logging.info(f"current run - {current_run}")
if current_run > max_run and max_run >= 0:
logging.info(f"exceeds max run, finish")
producer.producer.close()
break
producer.send(Order.auto().create(100))
time.sleep(1)