[JOBS]: How correctly connect RR v2023.3 with Kafka broker #1791
-
composer.json {
"name": "complayer",
"description": "Compatibility layer for SE Collector with the Saga pattern on Temporal",
"type": "project",
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"spiral/roadrunner": "^2023.3",
"spiral/roadrunner-jobs": "^4.2.0",
"spiral/roadrunner-worker": "^3.0",
"temporal/sdk": "2.7.x",
"temporal/open-telemetry-interceptors": "dev-master",
"open-telemetry/exporter-otlp": "^0.0.17",
"open-telemetry/transport-grpc": "^0.0.17",
"symfony/dotenv": "^6.2",
"spiral/tokenizer": "^3.8",
"symfony/console": "^6.3",
"google/protobuf": "^3.21",
"grpc/grpc": "^1.42",
"php": ">=8.1.0",
"ext-grpc": "*",
"ext-curl": "*",
"guzzlehttp/promises": "^2.0",
"php-http/httplug": "^2.4",
"nyholm/psr7": "^1.8"
},
"require-dev": {
"phpunit/phpunit": "^9.6.11"
},
"license": "proprietary",
"autoload": {
"psr-4": {
"Collector\\": "grpc/Collector",
"GPBMetadata\\": [
"grpc/GPBMetadata"
],
"Models\\": [
"grpc/Models"
],
"Complayer\\": "src/"
}
},
"config": {
"allow-plugins": {
"php-http/discovery": true
}
}
} sample of configuration roadrunner version: "3"
server:
relay: pipes
#command: "/usr/bin/php -d xdebug.mode=debug -d xdebug.start_with_request=yes -d xdebug.client_host=host.docker.internal -d xdebug.client_port=\"9008\" /app/consumer.php"
command: "/usr/bin/php /app/consumer.php"
kafka:
brokers: ["kafka"]
jobs:
consume: ["complayer-queue"]
num_pollers: 1
pipeline_size: 1000
pool:
num_workers: 1
allocate_timeout: 60s
pipelines:
complayer-queue:
driver: kafka
config:
consumer_options:
topics: ["se-collector.pt.event.out"]
group_options:
group_id: "complayerv2.0-test-consumer"
temporal:
address: temporal:7233
activities:
num_workers: 1
rpc:
listen: tcp://127.0.0.1:6001 consumer.php <?php
use Spiral\RoadRunner\Jobs\Task\ReceivedTaskInterface;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner;
use Spiral\RoadRunner\Worker;
use Temporal\Client\WorkflowClient;
require_once __DIR__.'/vendor/autoload.php';
$totalEventConsumed = 0;
$consumer = new \Spiral\RoadRunner\Jobs\Consumer();
$shouldBeRestarted = false;
echo "Consumer was created ...";
/** @var ReceivedTaskInterface $task */
while ($task = $consumer->waitTask()) {
try {
var_dump($task);
/*$payload = $task->getPayload();
if(empty($payload['Data'])){
continue;
}
$eventData = json_decode(base64_decode($payload['Data']), true);
$totalEventConsumed++;*/
$task->complete();
} catch (\Throwable $e) {
$task->fail($e, $shouldBeRestarted);
}
}
echo "Events were consumed ...";
but I remember that in the previous version |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 19 replies
-
Hey @denyskanunnikov 👋
So, there are no problems with a connection to the Kafka broker. The issue is that you consumed temporal data as JOB message. |
Beta Was this translation helpful? Give feedback.
-
@rustatian thank you very much for the explanation! |
Beta Was this translation helpful? Give feedback.
-
my issue was solved |
Beta Was this translation helpful? Give feedback.
Hi,
You can freely connect to the temporal server from any of the workers (well, except Workflow workers; these workers are special).
Implement a handler that communicates with temporal using WorkflowClient. You can start workflows or send signals to running workflow; there are no limitations.