Commit 800cc74f authored by Nane Kratzke's avatar Nane Kratzke
Browse files

Event Sourcing

parent 91d4d4a3
......@@ -124,9 +124,29 @@ Diese Übung zeigt Ihnen wie man Redis als persistentes Messaging System einsetz
- Studieren Sie anschließend die Klasse `messaging/MQueue.py`. Hier ist Ihnen eine Wrapper-Klasse um Redis gegeben, die die Commands `xadd`, `xread` und `xgroupread` so kapselt, dass diese komfortabel mittels einer `listen()`-Methode für Consumer und mittels einer `publish()`-Methode für Producer genutzt werden kann. *Auf `listen_as_group()` gehen wir in Übung 04 noch genauer ein.*
- Versuchen Sie nun die Dateien `messaging/queueing-producer.py` und `messaging/queueing-consumer.py` nachzuvollziehen, um zu verstehen, wie man die Klasse `MQueue` für ein Publish-Subscribe Pattern einsetzen kann.
- Starten Sie nun in Gitlab manuell die Jobs `queueing-consumer` und `queueing-producer` in der `deploy`-Stage um einen Consumer und einen Producer zu erzeugen.
- Warten Sie ab, bis beide Deployments erzeugt wurden (vollziehen Sie dies in Lens nach).
- Sehen Sie sich dann in Lens die Logs des Consumer Pods und die Logs des Producer Pods an. Sie sehen, das funktinioniert wie in Übung 01 auch.
- Löschen Sie nun das Consuming Deployment (es soll also wirklich kein Consuming Pod laufen). Warten Sie ein paar Minuten und merken Sie sich im Consumer in etwa die Message die zum Zeitpunkt des Löschens erzeugt wurde.
- Deployen Sie dann den Consumer in Gitlab erneut. Beobachten Sie nun in Lens das Verhalten im Pod Log des neuen Consumers. Sie werden feststellen, dass es im Consumer immer noch einen Gap bei den empfangenen Messages gibt. Die Messages während der Consumer down war, werden weiterhin nicht empfangen.
Was soll also der ganze Aufwand mit der Persistierung von Message Streams?
## Übung 03: Event Sourcing
Messaging Systeme werden normalerweise so eingesetzt, dass Consumer ab dem Zeitpunkt auf Events lauschen, ab dem Sie gestartet wurden. Man verfolgt also normalerweise Messages in (naher) Echtzeit und ist nicht an Events der Vergangenheit interessiert. Es kann aber durchaus sinnvoll sein, auch Events der Vergangenheit zu verarbeiten. Z.B. weil eine Teilkomponenten down war (ggf. sogar mehrere Stunden). Dann wäre es schön, wenn man dieses "Backlog" wieder aufarbeiten könnte, einfach indem man die Events noch einmal ablaufen lässt. Dies ist -- vereinfacht ausgedrückt -- [Event Sourcing](http://martinfowler.com/eaaDev/EventSourcing.html) (vgl. auch [Wikipedia](https://de.wikipedia.org/wiki/Event_Sourcing)).
Man kann persistenten Messaging Systemen normalerweise mitteilen, ab welchem Zeitpunkt Events abgerufen werden sollen. Unsere Wrapper-Klasse sieht hierzu den `since`-Parameter vor.
Ergänzen Sie nun in der `messaging/queueing-consumer.py` diesen `since`-Parameter.
```Python
for since, msg in queue.listen(since=0):
print(msg)
```
Deployen Sie dann den Consumer in Gitlab erneut. Beobachten Sie nun in Lens das Verhalten im Pod Log des neuen Consumers. Sie sollten feststellen, dass einige Messages sehr schnell einfliegen, und sich dann irgendwann des Tempo verlangsamt. Woher kommt das? Es werden nun alle Messages abgerufen (seit Anbeginn der Queue). In Übung 01 wären diese Messages einfach "verschwunden".
## Übung 04: Queueing
```
......
......@@ -8,7 +8,7 @@ channel = os.environ.get("CHANNEL", "xqueue")
queue = MQueue(channel, redis.Redis(host=host, port=port, db=0))
for since, msg in queue.listen():
for since, msg in queue.listen(since=0):
# We would normally process the messages here.
# However, to not overflow the log we skip this here.
# You can print the message, if you want.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment