Install package
pip install sqs-client
from sqs_client.client import SQSClient
sqs_client = SQSClient()
# Subscribe to a SQS
@sqs_client.task(
queue_name="sqs-queue-name",
wait_time_seconds=20,
visibility_timeout=300,
daemon=False,
)
def test_task(message):
print("test_task received:", message)
from sqs_client.client import SQSClient
from sqs_client.publisher import Publisher
sqs_client = SQSClient()
sqs_client.publish(
queue_name="sqs-queue-name",
message="test message",
)
# or
publisher = Publisher(
sqs_client=sqs_client,
queue_name="sqs-queue-name",
)
publisher.publish("test message")
Faster to subscribe and publish a message to SQS
from sqs_client.client import SQSClient
sqs_client = SQSClient()
# Subscribe to a SQS
@sqs_client.task(
queue_name="sqs-queue-name",
lazy=True,
daemon=False,
wait_time_seconds=20,
visibility_timeout=300,
)
def test_task(message, abc):
print("test_task received message:", message)
print("test_task received abc:", abc)
# Publish a message
test_task.trigger("Test message", abc=1)
Publish a lazy mode message without subscribe
from sqs_client.client import SQSClient
from sqs_client.publisher import Publisher
sqs_client = SQSClient()
publisher = Publisher(
sqs_client=sqs_client,
queue_name="sqs-queue-name",
)
publisher.publish_lazy("Test lazy message", abc=1)
This project is Copyright (c) 2023 and onwards Digital Fortress. It is free software and may be redistributed under the terms specified in the LICENSE file.
This project is made and maintained by Digital Fortress.
We are an experienced team in R&D, software, hardware, cross-platform mobile and DevOps.
See more of our projects or do you need to complete one?