Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic prefix to kafka #369

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions blockchainetl/jobs/exporters/kafka_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,24 @@
class KafkaItemExporter:

def __init__(self, output, item_type_to_topic_mapping, converters=()):
self.item_type_to_topic_mapping = item_type_to_topic_mapping
self.item_type_to_topπic_mapping = item_type_to_topic_mapping
FeSens marked this conversation as resolved.
Show resolved Hide resolved
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)
print(self.connection_url)
self.topic_prefix = self.get_topic_prefix(output)
print(self.connection_url, self.topic_prefix)
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)

def get_connection_url(self, output):
try:
return output.split('/')[1]
except KeyError:
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"')
except IndexError:
raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/<topic-prefix>"')

def get_topic_prefix(self, output):
try:
return output.split('/')[2] + "."
except IndexError:
return ''

def open(self):
pass
Expand All @@ -34,7 +41,7 @@ def export_item(self, item):
if item_type is not None and item_type in self.item_type_to_topic_mapping:
data = json.dumps(item).encode('utf-8')
print(data)
return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data)
return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], value=data)
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

Expand All @@ -45,7 +52,6 @@ def convert_items(self, items):
def close(self):
pass


def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
Expand Down
2 changes: 1 addition & 1 deletion docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ e.g. `-e block,transaction,log,token_transfer,trace,contract,token`.
- For Postgres: `--output=postgresql+pg8000://<user>:<password>@<host>:<port>/<database_name>`,
e.g. `--output=postgresql+pg8000://postgres:[email protected]:5432/ethereum`.
- For GCS: `--output=gs://<bucket_name>`. Make sure to install and initialize `gcloud` cli.
- For Kafka: `--output=kafka/<host>:<port>`, e.g. `--output=kafka/127.0.0.1:9092`
- For Kafka: `--output=kafka/<host>:<port>/<optional: topic_prefix>`, e.g. `--output=kafka/127.0.0.1:9092` or `--output=kafka/127.0.0.1:9092/crypto_ethereum`.
- Those output types can be combined with a comma e.g. `--output=gs://<bucket_name>,projects/<your-project>/topics/crypto_ethereum`

The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)
Expand Down