Skip to content

Commit 384dbe5

Browse files
authored
Merge pull request #17 from blockchain-etl/feature/streaming_refactoring
Feature/streaming refactoring
2 parents 6d2b444 + 81e2f49 commit 384dbe5

18 files changed

+394
-186
lines changed

.dockerignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.*
22
last_synced_block.txt
3+
pid.txt
34
output

Dockerfile_with_streaming

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,9 @@ WORKDIR /$PROJECT_DIR
77
COPY . .
88
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]
99

10-
ENTRYPOINT ["python", "bitcoinetl"]
10+
# Add Tini
11+
ENV TINI_VERSION v0.18.0
12+
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
13+
RUN chmod +x /tini
14+
15+
ENTRYPOINT ["/tini", "--", "python", "bitcoinetl"]

README.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Stream blockchain data continually to Google Pub/Sub:
3737

3838
```bash
3939
> export GOOGLE_APPLICATION_CREDENTIALS=/path_to_credentials_file.json
40-
> bitcoinetl stream -p http://user:pass@localhost:8332 --start-block 500000 --output projects/your-project/topics/bitcoin_blockchain
40+
> bitcoinetl stream -p http://user:pass@localhost:8332 --start-block 500000 --output projects/your-project/topics/crypto_bitcoin
4141

4242
```
4343

@@ -291,7 +291,9 @@ You can tune `--export-batch-size`, `--max-workers` for performance.
291291

292292
- This command outputs blocks and transactions to the console by default.
293293
- Use `--output` option to specify the Google Pub/Sub topic where to publish blockchain data,
294-
e.g. `projects/your-project/topics/bitcoin_blockchain`.
294+
e.g. `projects/your-project/topics/crypto_bitcoin`. Blocks and transactions will be pushed to
295+
`projects/your-project/topics/crypto_bitcoin.blocks` and `projects/your-project/topics/crypto_bitcoin.transactions`
296+
topics.
295297
- The command saves its state to `last_synced_block.txt` file where the last synced block number is saved periodically.
296298
- Specify either `--start-block` or `--last-synced-block-file` option. `--last-synced-block-file` should point to the
297299
file where the block number, from which to start streaming the blockchain data, is saved.

bitcoinetl/cli/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131

3232
@click.group()
33-
@click.version_option(version='1.2.0')
33+
@click.version_option(version='1.2.1')
3434
@click.pass_context
3535
def cli(ctx):
3636
pass

bitcoinetl/cli/stream.py

+21-9
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121
# SOFTWARE.
2222

23-
2423
import click
24+
2525
from bitcoinetl.enumeration.chain import Chain
2626
from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc
2727

2828
from blockchainetl.logging_utils import logging_basic_config
29+
from blockchainetl.streaming.streaming_utils import configure_logging, configure_signals
2930
from blockchainetl.thread_local_proxy import ThreadLocalProxy
3031

3132
logging_basic_config()
@@ -41,25 +42,36 @@
4142
'If not specified will print to console')
4243
@click.option('-s', '--start-block', default=None, type=int, help='Start block')
4344
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain')
44-
@click.option('-s', '--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs')
45+
@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs')
4546
@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request')
4647
@click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round')
4748
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers')
49+
@click.option('--log-file', default=None, type=str, help='Log file')
50+
@click.option('--pid-file', default=None, type=str, help='pid file')
4851
def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN,
49-
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5):
52+
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None):
5053
"""Streams all data types to console or Google Pub/Sub."""
54+
configure_logging(log_file)
55+
configure_signals()
56+
5157
from bitcoinetl.streaming.streaming_utils import get_item_exporter
52-
from bitcoinetl.streaming.stream import stream as do_stream
58+
from bitcoinetl.streaming.btc_streamer_adapter import BtcStreamerAdapter
59+
from blockchainetl.streaming.streamer import Streamer
5360

54-
do_stream(
61+
streamer_adapter = BtcStreamerAdapter(
5562
bitcoin_rpc=ThreadLocalProxy(lambda: BitcoinRpc(provider_uri)),
63+
item_exporter=get_item_exporter(output),
64+
chain=chain,
65+
batch_size=batch_size,
66+
max_workers=max_workers
67+
)
68+
streamer = Streamer(
69+
blockchain_streamer_adapter=streamer_adapter,
5670
last_synced_block_file=last_synced_block_file,
5771
lag=lag,
58-
item_exporter=get_item_exporter(output),
5972
start_block=start_block,
60-
chain=chain,
6173
period_seconds=period_seconds,
62-
batch_size=batch_size,
6374
block_batch_size=block_batch_size,
64-
max_workers=max_workers
75+
pid_file=pid_file,
6576
)
77+
streamer.stream()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# MIT License
2+
#
3+
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in all
13+
# copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
23+
24+
import logging
25+
26+
from bitcoinetl.enumeration.chain import Chain
27+
from bitcoinetl.jobs.enrich_transactions import EnrichTransactionsJob
28+
from bitcoinetl.jobs.export_blocks_job import ExportBlocksJob
29+
from bitcoinetl.service.btc_service import BtcService
30+
from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter
31+
from blockchainetl.jobs.exporters.in_memory_item_exporter import InMemoryItemExporter
32+
33+
34+
class BtcStreamerAdapter:
35+
def __init__(
36+
self,
37+
bitcoin_rpc,
38+
item_exporter=ConsoleItemExporter(),
39+
chain=Chain.BITCOIN,
40+
batch_size=2,
41+
max_workers=5):
42+
self.bitcoin_rpc = bitcoin_rpc
43+
self.chain = chain
44+
self.btc_service = BtcService(bitcoin_rpc, chain)
45+
self.item_exporter = item_exporter
46+
self.batch_size = batch_size
47+
self.max_workers = max_workers
48+
49+
def open(self):
50+
self.item_exporter.open()
51+
52+
def get_current_block_number(self):
53+
return int(self.btc_service.get_latest_block().number)
54+
55+
def export_all(self, start_block, end_block):
56+
# Export blocks and transactions
57+
blocks_and_transactions_item_exporter = InMemoryItemExporter(item_types=['block', 'transaction'])
58+
59+
blocks_and_transactions_job = ExportBlocksJob(
60+
start_block=start_block,
61+
end_block=end_block,
62+
batch_size=self.batch_size,
63+
bitcoin_rpc=self.bitcoin_rpc,
64+
max_workers=self.max_workers,
65+
item_exporter=blocks_and_transactions_item_exporter,
66+
chain=self.chain,
67+
export_blocks=True,
68+
export_transactions=True
69+
)
70+
blocks_and_transactions_job.run()
71+
72+
blocks = blocks_and_transactions_item_exporter.get_items('block')
73+
transactions = blocks_and_transactions_item_exporter.get_items('transaction')
74+
75+
# Enrich transactions
76+
enriched_transactions_item_exporter = InMemoryItemExporter(item_types=['transaction'])
77+
78+
enrich_transactions_job = EnrichTransactionsJob(
79+
transactions_iterable=transactions,
80+
batch_size=self.batch_size,
81+
bitcoin_rpc=self.bitcoin_rpc,
82+
max_workers=self.max_workers,
83+
item_exporter=enriched_transactions_item_exporter,
84+
chain=self.chain
85+
)
86+
enrich_transactions_job.run()
87+
enriched_transactions = enriched_transactions_item_exporter.get_items('transaction')
88+
if len(enriched_transactions) != len(transactions):
89+
raise ValueError('The number of transactions is wrong ' + str(transactions))
90+
91+
logging.info('Exporting with ' + type(self.item_exporter).__name__)
92+
self.item_exporter.export_items(blocks + enriched_transactions)
93+
94+
def close(self):
95+
self.item_exporter.close()

bitcoinetl/streaming/stream.py

-145
This file was deleted.

blockchainetl/jobs/exporters/console_item_exporter.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
# SOFTWARE.
2222

2323
import json
24-
import logging
2524

2625

2726
class ConsoleItemExporter:
@@ -33,7 +32,7 @@ def export_items(self, items):
3332
self.export_item(item)
3433

3534
def export_item(self, item):
36-
logging.info(json.dumps(item))
35+
print(json.dumps(item))
3736

3837
def close(self):
3938
pass

0 commit comments

Comments
 (0)