Commit 92012ec7 authored by Nane Kratzke's avatar Nane Kratzke
Browse files

Restructured messaging

parent 129992d4
......@@ -21,7 +21,7 @@ volumes:
script:
- kubectl apply -f volumes/redis-pvc.yaml
consumer:
container:
stage: build
image:
name: gcr.io/kaniko-project/executor:debug
......@@ -29,29 +29,13 @@ consumer:
only:
changes:
- .gitlab-ci.yml
- consumer/*
- consumer/*/*
- messaging/*
- messaging/*/*
script:
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > /kaniko/.docker/config.json
- cd consumer
- /kaniko/executor --context . --destination $CI_REGISTRY_IMAGE/consumer:latest
producer:
stage: build
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
only:
changes:
- .gitlab-ci.yml
- producer/*
- producer/*/*
script:
- mkdir -p /kaniko/.docker
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > /kaniko/.docker/config.json
- cd producer
- /kaniko/executor --context . --destination $CI_REGISTRY_IMAGE/producer:latest
- cd messaging
- /kaniko/executor --context . --destination $CI_REGISTRY_IMAGE:latest
redis:
stage: deploy
......@@ -62,12 +46,28 @@ redis:
pubsub-consumer:
stage: deploy
when: manual
script:
- mo deploy/pubsub-consumer-dep.yaml | kubectl delete -f - || true
- mo deploy/pubsub-consumer-dep.yaml | kubectl apply -f -
pubsub-producer:
stage: deploy
when: manual
script:
- mo deploy/pubsub-producer-dep.yaml | kubectl delete -f - || true
- mo deploy/pubsub-producer-dep.yaml | kubectl apply -f -
queueing-consumer:
stage: deploy
when: manual
script:
- mo deploy/queueing-consumer-dep.yaml | kubectl delete -f - || true
- mo deploy/queueing-consumer-dep.yaml | kubectl apply -f -
queueing-producer:
stage: deploy
when: manual
script:
- mo deploy/queueing-producer-dep.yaml | kubectl delete -f - || true
- mo deploy/queueing-producer-dep.yaml | kubectl apply -f -
......@@ -10,7 +10,9 @@ Die in diesem Lab am Beispiel von Redis vermittelten Prinzipien sind aber durcha
- [Inhalt](#inhalt)
- [Vorbereitung](#vorbereitung)
- [Übung 01: Pub/Sub Messaging (Fan Out)](#übung-01-pubsub-messaging-fan-out)
- [Übung 02: Queueing](#übung-02-queueing)
- [Übung 02: Persistentes Pub/Sub Messaging](#übung-02-persistentes-pubsub-messaging)
- [Übung 03: Event Sourcing](#übung-03-event-sourcing)
- [Übung 04: Queueing](#übung-04-queueing)
- [Verständnis- und Transferfragen](#verständnis--und-transferfragen)
- [Links](#links)
- [Was sollten Sie mitnehmen](#was-sollten-sie-mitnehmen)
......@@ -27,7 +29,11 @@ P --M3,M2,M1--+--M3,M2,M1-> C
+--M3,M2,M1-> C
```
## Übung 02: Queueing
## Übung 02: Persistentes Pub/Sub Messaging
## Übung 03: Event Sourcing
## Übung 04: Queueing
```
+--M4,M1-> C2
......@@ -41,6 +47,8 @@ P --M4,M3,M2,M1--+-----M2-> C3
- Für welche Anwendungsfälle würden Sie das PubSub-Kommunikationsmuster einsetzen?
- Für welche Anwendungsfälle würden Sie das Queueing-Kommunikationsmuster einsetzen?
- Welche Vor- und Nachteile hat die Persistierung von Message Queues?
- Erklären Sie Event Sourcing in eigenen Worten.
- Vergleichen Sie die Latenzzeiten bei PubSub und Queue? Gibt es nennenswerte Unterschiede?
- Vergleichen Sie die Latenzzeiten dieses Labs mit dem gRPC-Lab? Welche Technologie ist grundsätzlich performanter? Woher kommt das?
- Wie stufen Sie den Grad der Kopplung bei Messaging/Streaming im Vergleich zu gRPC ein?
......@@ -61,7 +69,7 @@ P --M4,M3,M2,M1--+-----M2-> C3
## Was sollten Sie mitnehmen
- Obwohl Redis eigentlich eine In-Memory Key-Value Datenbank ist, kann Sie (für viele überraschend) auch für (einfaches) Messaging eingesetzt werden. Gegenüber komplexeren Lösungen wie bspw. Kafka kann dies durchaus von Vorteil sein.
- Das Kommunikationsmuster Publish-Subscribe kann für vor allem für Fan-Out Anwendungsfälle eingesetzt werden, d.h. bei der Nachrichten an ALLE Consumer gehen.
- Das Kommunikationsmuster Publish-Subscribe kann für vor allem für Fan-Out Anwendungsfälle eingesetzt werden, d.h. bei der Nachrichten an ALLE Consumer gehen. Wenn Publish-Subscribe Messaging persistiert wird, kann es zum [Event Sourcing](https://de.wikipedia.org/wiki/Event_Sourcing) verwendet werden.
- Das Kommunikationsmuster Queueing wird vor allem für Load Balancing eingesetzt werden, bei der Nachrichten über mehrere Consumer zur Verarbeitung verteilt und dort verarbeitet werden müssen. Sowohl Producer als auch Consumer können dabei unabhängig von einander skaliert werden.
- Messaging ist hinsichtlich Übertragungslatenzen meist performanter als gRPC, gRPC ist üblicherweise performanter als REST.
- Sowohl REST als auch Messaging ermöglichen eine lose Kopplung von Services.
......
......@@ -16,4 +16,5 @@ spec:
- name: gitlab-registry-credentials
containers:
- name: pubsub-consumer
image: {{ CI_REGISTRY_IMAGE }}/consumer:latest
image: {{ CI_REGISTRY_IMAGE }}:latest
command: ["python", "-u", "pubsub-consumer.py"]
......@@ -16,4 +16,5 @@ spec:
- name: gitlab-registry-credentials
containers:
- name: pubsub-producer
image: {{ CI_REGISTRY_IMAGE }}/producer:latest
image: {{ CI_REGISTRY_IMAGE }}:latest
command: ["python", "-u", "pubsub-producer.py"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: queueing-consumer
spec:
replicas: 1
selector:
matchLabels:
app: queueing-consumer
template:
metadata:
labels:
app: queueing-consumer
spec:
containers:
- name: queueing-consumer
image: {{ CI_REGISTRY_IMAGE }}:latest
command: ["python", "-u", "queueing-consumer.py"]
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: queueing-producer
spec:
replicas: 1
selector:
matchLabels:
app: queueing-producer
template:
metadata:
labels:
app: queueing-producer
spec:
containers:
- name: queueing-producer
image: {{ CI_REGISTRY_IMAGE }}:latest
command: ["python", "-u", "queueing-producer.py"]
\ No newline at end of file
import json, socket, time
from redis import Redis
# Redis Wrapper providing convenience methods
# to use Redis as a simple but persisting
# message queueing system.
#
class MQueue:
# Constructor
#
def __init__(self, ch : str, r : Redis):
self.redis = r
self.channel = ch
self.host = socket.gethostname()
# Publish json encodable data into the message queue
#
def publish(self, data : list or dict):
print(f"{ self.host } pushes message { json.dumps(data) } into channel '{ self.channel }'")
self.redis.xadd(self.channel, {
"timestamp": time.time_ns(),
"data": json.dumps(data)
})
# Listen for PubSub communciation patterns on message queues
#
def listen(self, since="$"):
try:
self.redis.xgroup_create(self.channel, "consumers")
print(f"{ self.host } creates consumer group for { self.channel }")
except Exception as ex:
print(f"{ self.host } has found already existing consumer group for { self.channel }")
while True:
messages = self.redis.xread({ self.channel : since }, count=100, block=10000)
if not messages:
return
for entry in messages[0][1]:
ts = time.time_ns()
since, data = entry
latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
message = json.loads(data[b'data'].decode('utf8'))
print(f"{latency:.2f}ms | msg { since }: { message }")
yield (since, message)
# Group listen for queueing communication patterns on message queues
#
def listen_as_group(self, group : str, timeout=10000):
try:
self.redis.xgroup_create(self.channel, "consumers")
print(f"{ self.host } creates consumer group for { self.channel }")
except Exception as ex:
print(f"{ self.host } has found already existing consumer group for { self.channel }")
while True:
messages = self.redis.xreadgroup(group, self.host, { self.channel : ">" }, count=1, block=timeout)
if not messages:
return
for entry in messages[0][1]:
ts = time.time_ns()
since, data = entry
print(data)
latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
if b'data' in data:
message = json.loads(data[b'data'].decode('utf8'))
print(f"{latency:.2f}ms | msg { since }: { message }")
yield (since, message)
else:
print(f"Could not find payload data in msg { data }")
......@@ -3,7 +3,7 @@ import redis
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xfan")
channel = os.environ.get("CHANNEL", "xpubsub")
r = redis.Redis(host=host, port=port, db=0)
......
......@@ -3,7 +3,7 @@ import redis
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xfan")
channel = os.environ.get("CHANNEL", "xpubsub")
stream = redis.Redis(host=host, port=port, db=0)
......@@ -12,8 +12,8 @@ i = 1
while True:
msg = f"Hi, this is message #{ i } from { me }."
i = i + 1
n = random.randint(1, 3)
time.sleep(n)
n = random.randint(1, 5)
time.sleep(n / 10)
print(f"Publishing message '{ msg }' to channel '{ channel }'")
try:
stream.publish(channel, json.dumps({
......
import os, time, random, json, socket
import redis
from messaging import MQueue
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xqueue")
r = redis.Redis(host=host, port=port, db=0)
me = socket.gethostname()
queue = MQueue(channel, redis.Redis(host=host, port=port, db=0))
for since, msg in queue.listen_as_group(group="consumers"):
print(f"{ since }: { msg }")
while True:
_, result = r.xreadgroup("consumers", me, { channel : ">" }, count=1, block=60000)[0]
ts = time.time_ns()
id, data = result[0]
print(data)
latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
message = data[b'message'].decode('utf8')
print(f"{latency:.2f}ms | msg: { message }")
import os, time, random, json, socket
import redis
from messaging import MQueue
host = os.environ.get("REDIS_SVC", "redis-svc")
port = int(os.environ.get("REDIS_PORT", "6379"))
channel = os.environ.get("CHANNEL", "xqueue")
r = redis.Redis(host=host, port=port, db=0)
me = socket.gethostname()
try:
r.xgroup_create(channel, "consumers")
except Exception as ex:
print("Consumers already exists")
queue = MQueue(channel, redis.Redis(host=host, port=port, db=0))
i = 0
while True:
i = i + 1
msg = f"Hi, this is message #{ i } from { me }."
n = random.randint(1, 3)
time.sleep(n)
print(f"Publishing message '{ msg }' to channel '{ channel }'")
msg = f"Hi, this is message #{ i } from { queue.host }."
n = random.randint(1, 10)
time.sleep(n / 10)
try:
r.xadd(channel, {
"timestamp": time.time_ns(),
"message": msg
})
queue.publish({ 'text': msg })
except Exception as ex:
print(f"Could not publish message '{msg}' caused by {ex}")
FROM python:3.9-slim
COPY Requirements.txt /app/Requirements.txt
RUN pip3 install -r /app/Requirements.txt
COPY . /app/
WORKDIR /app
ENTRYPOINT ["python3", "-u", "pubsub.py"]
redis
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment