Commit 42dc4978 authored by Nane Kratzke's avatar Nane Kratzke
Browse files

Queueing

parent 610d27d1
...@@ -144,7 +144,7 @@ for since, msg in queue.listen(since=0): ...@@ -144,7 +144,7 @@ for since, msg in queue.listen(since=0):
print(msg) print(msg)
``` ```
Deployen Sie dann den Consumer in Gitlab erneut. Beobachten Sie nun in Lens das Verhalten im Pod Log des neuen Consumers. Sie sollten feststellen, dass einige Messages sehr schnell einfliegen, und sich dann irgendwann des Tempo verlangsamt. Woher kommt das? Es werden nun alle Messages abgerufen (seit Anbeginn der Queue). In Übung 01 wären diese Messages einfach "verschwunden". Committen Sie diese Änderung in das Repo und deployen Sie dann den Consumer mittels der Gitlab Pipeline erneut. Beobachten Sie nun in Lens das Verhalten im Pod Log des neuen Consumers. Sie sollten feststellen, dass einige Messages sehr schnell einfliegen, und sich dann irgendwann des Tempo verlangsamt. Woher kommt das? Es werden nun alle Messages abgerufen (seit Anbeginn der Queue). In Übung 01 wären diese Messages einfach "verschwunden".
Sie können auch ab einem beliebigen Zeitpunkt aufsetzen. Schauen Sie doch einfach in Ihr Log des Consumers, um sich einen beliebigen Aufsetzpunkt zu wählen. Sie können auch ab einem beliebigen Zeitpunkt aufsetzen. Schauen Sie doch einfach in Ihr Log des Consumers, um sich einen beliebigen Aufsetzpunkt zu wählen.
...@@ -156,7 +156,7 @@ Sie können auch ab einem beliebigen Zeitpunkt aufsetzen. Schauen Sie doch einfa ...@@ -156,7 +156,7 @@ Sie können auch ab einem beliebigen Zeitpunkt aufsetzen. Schauen Sie doch einfa
1319.34ms | msg b'1614769668100-0': {'text': 'Hi, this is message #629 from queueing-producer-64b575bb98-wssf2.'} 1319.34ms | msg b'1614769668100-0': {'text': 'Hi, this is message #629 from queueing-producer-64b575bb98-wssf2.'}
``` ```
Sie könnten dann z.B. eingeben, um ab der Message `#628` weiter zu lesen. Um z.B. ab der Message `#628` weiter zu lesen, könnten dann folgendes eingeben:
```Python ```Python
for since, msg in queue.listen(since=1614769663096): for since, msg in queue.listen(since=1614769663096):
...@@ -165,16 +165,51 @@ for since, msg in queue.listen(since=1614769663096): ...@@ -165,16 +165,51 @@ for since, msg in queue.listen(since=1614769663096):
Committen Sie, pushen Sie in die Gitlab Pipeline und redeployen anschließend den Consumer erneut. Sie werden feststellen, die Logs setzen ab Message `#628` fort. Committen Sie, pushen Sie in die Gitlab Pipeline und redeployen anschließend den Consumer erneut. Sie werden feststellen, die Logs setzen ab Message `#628` fort.
```
queueing-consumer-66cc6f8cfb-v55w8 has found already existing consumer group for xqueue
594444.00ms | msg b'1614769664098-0': {'text': 'Hi, this is message #628 from queueing-producer-64b575bb98-wssf2.'}
590442.67ms | msg b'1614769668100-0': {'text': 'Hi, this is message #629 from queueing-producer-64b575bb98-wssf2.'}
588439.98ms | msg b'1614769670102-0': {'text': 'Hi, this is message #630 from queueing-producer-64b575bb98-wssf2.'}
583434.14ms | msg b'1614769675108-0': {'text': 'Hi, this is message #631 from queueing-producer-64b575bb98-wssf2.'}
581431.20ms | msg b'1614769677111-0': {'text': 'Hi, this is message #632 from queueing-producer-64b575bb98-wssf2.'}
579428.24ms | msg b'1614769679114-0': {'text': 'Hi, this is message #633 from queueing-producer-64b575bb98-wssf2.'}
...
```
__Transferaufgabe:__
- Schreiben Sie den Consumer so um, dass er sich den Zeitpunkt der letzten von Ihm verarbeiteten Nachricht merkt (Achtung Stateful! Sie brauchen dann wohl ein Volume oder Sie speichern sich dies in Redis selber!), damit er bei einem Restart an exakt dieser Stelle mit der Verarbeitung des Event Streams fortsetzen kann.
- Passen Sie ggf. die Deployment Pipeline und das Deployment Manifest an (falls Sie ein Volume nutzen wollen).
Sie können auch gerne mehrere Consumer starten. Sie werden feststellen, dass diese alle ab derselben Stelle mit der Bearbeitung fortsetzen werden. D.h. die Messages gehen immer noch wie in Übung 01 und 02 an alle Consumer.
Was können wir machen, damit Messages über mehrere Consumer gleichmäßig verteilt werden, um Arbeit über mehrere Instanzen verteilen zu können?
## Übung 04: Queueing ## Übung 04: Queueing
Beim Queueing Kommunikationsmuster erzeugt ein Producer *P* Messages, die wie gehabt in einen Channel einer Message Queue gegeben werden (Publish). Diese Queue verteilt dann allerdings alle Messages an mehrere Consumer *C1, C2, C3, ..* einer Gruppe, die sich für diesen Channel angemeldet haben (Subscribe). So gehen nicht alle Messages an alle Consumer, sondern werden innerhalb der Gruppe verteilt. Weiterhin müssen sich die Consumer untereinander nicht kennen und der Producer die Consumer ebenfalls nicht.
``` ```
+--M4,M1-> C2 +--M4,M1-> C1
| |
P --M4,M3,M2,M1--+-----M2-> C3 P --M4,M3,M2,M1--+-----M2-> C2
| |
+-----M3-> C4 +-----M3-> C3
``` ```
Messaging Systeme benötigen hierfür zusätzlich die Information welche Consumer sich zu einer Gruppe zusammenfinden. Für die Subscription ist also auf Seiten des Consumers zusätzlich die Angabe einer Gruppe erforderlich, damit das Messaging System Messages innerhalb der Gruppe auch verteilen kann.
Um dies auszuprobieren, können Sie in der `messaging/queueing-consumer.py` die `listen_as_group()`-Methode nutzen.
```Python
for since, msg in queue.listen_as_group("consumers"):
print(msg)
```
Committen Sie diese Änderung in das Repo und deployen Sie dann den Consumer mittels der Gitlab Pipeline erneut. Beobachten Sie nun in Lens das Verhalten im Pod Log des neuen Consumers. Auf den ersten Blick werden Sie keine Änderung feststellen. Das Verhalten des Consumers ist wie in Übung 02.
## Verständnis- und Transferfragen ## Verständnis- und Transferfragen
- Für welche Anwendungsfälle würden Sie das PubSub-Kommunikationsmuster einsetzen? - Für welche Anwendungsfälle würden Sie das PubSub-Kommunikationsmuster einsetzen?
......
...@@ -8,7 +8,7 @@ channel = os.environ.get("CHANNEL", "xqueue") ...@@ -8,7 +8,7 @@ channel = os.environ.get("CHANNEL", "xqueue")
queue = MQueue(channel, redis.Redis(host=host, port=port, db=0)) queue = MQueue(channel, redis.Redis(host=host, port=port, db=0))
for since, msg in queue.listen(since=1614769663096): for since, msg in queue.listen_as_group("consumers"):
# We would normally process the messages here. # We would normally process the messages here.
# However, to not overflow the log we skip this here. # However, to not overflow the log we skip this here.
# You can print the message, if you want. # You can print the message, if you want.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment