Commit 129992d4 authored by Nane Kratzke's avatar Nane Kratzke
Browse files

Queueing

parent a4c17e9f
# Lab 10: Messaging-basierte Interaktion # Lab 10: Messaging-basierte Interaktion
Wir werden Messaging erstaunlicherweise am Beispiel der Datenbank [Redis](https://redis.io) demonstrieren. Wir nutzen Redis vor allem deswegen, weil es sehr einfach zu deployen ist und bereits an anderen Stellen in Labs verwendet wurde und wir so den Technologiestack übersichtlich und die Komplexität überschaubar halten können. Redis wurde zwar primär als In-Memory Key-Value Datenbank entwickelt, hat jedoch mittlerweile sowohl Publish-Subscribe als auch Queueing Features erhalten, die für einfache Anwendungsfälle durchaus ausreichend sind. Als Messaging-Komponenten werden in Cloud-nativen Systemen häufig "klassische" Lösungen wie bspw. [Kafka](https://kafka.apache.org), [Nats](https://nats.io), [RabbitMQ](https://www.rabbitmq.com), [ActiveMQ](https://activemq.apache.org) und weitere [AMQP](https://www.amqp.org)- oder [MQTT](https://mqtt.org)-konforme Lösungen verwendet. Wir werden in diesem Lab Messaging überaschenderweise am Beispiel einer Datenbank -- [Redis](https://redis.io) -- demonstrieren. Wir nutzen Redis vor allem deswegen, weil es sehr einfach zu deployen ist und bereits an anderen Stellen in Labs verwendet wurde und wir so den Technologiestack übersichtlich und die Komplexität überschaubar halten können. Redis wurde zwar primär als In-Memory Key-Value Datenbank entwickelt, hat jedoch mittlerweile sowohl Publish-Subscribe als auch Queueing Features erhalten, die für einfache Anwendungsfälle durchaus ausreichend sind. Als Messaging-Komponenten werden in Cloud-nativen Systemen häufig "klassische" Lösungen wie bspw. [Kafka](https://kafka.apache.org), [Nats](https://nats.io), [RabbitMQ](https://www.rabbitmq.com), [ActiveMQ](https://activemq.apache.org) und weitere [AMQP](https://www.amqp.org)- oder [MQTT](https://mqtt.org)-konforme Lösungen verwendet.
Die in diesem Lab am Beispiel von Redis vermittelten Prinzipien sind aber durchaus auf diese Systeme übertragbar, da die zu Grunde liegenden Kommunikationsmuster Publish-Subscribe und Queueing letztlich von all diesen Messaging-Systemen ermöglicht werden. Die in diesem Lab am Beispiel von Redis vermittelten Prinzipien sind aber durchaus auf diese Systeme übertragbar, da die zu Grunde liegenden Kommunikationsmuster Publish-Subscribe und Queueing letztlich von all diesen Messaging-Systemen ermöglicht werden.
......
import os, time, random, json, socket
import redis
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xqueue")
r = redis.Redis(host=host, port=port, db=0)
me = socket.gethostname()
while True:
_, result = r.xreadgroup("consumers", me, { channel : ">" }, count=1, block=60000)[0]
ts = time.time_ns()
id, data = result[0]
print(data)
latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
message = data[b'message'].decode('utf8')
print(f"{latency:.2f}ms | msg: { message }")
import os, time, random, json, socket
import redis
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xqueue")
r = redis.Redis(host=host, port=port, db=0)
me = socket.gethostname()
try:
r.xgroup_create(channel, "consumers")
except Exception as ex:
print("Consumers already exists")
i = 0
while True:
i = i + 1
msg = f"Hi, this is message #{ i } from { me }."
n = random.randint(1, 3)
time.sleep(n)
print(f"Publishing message '{ msg }' to channel '{ channel }'")
try:
r.xadd(channel, {
"timestamp": time.time_ns(),
"message": msg
})
except Exception as ex:
print(f"Could not publish message '{msg}' caused by {ex}")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment