messaging.py 2.75 KB
Newer Older
Nane Kratzke's avatar
Nane Kratzke committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import json, socket, time

from redis import Redis

# Redis Wrapper providing convenience methods
# to use Redis as a simple but persisting
# message queueing system.
#
class MQueue:

    # Constructor
    #
    def __init__(self, ch : str, r : Redis):
        self.redis = r
        self.channel = ch
        self.host = socket.gethostname()

    # Publish json encodable data into the message queue
    #
    def publish(self, data : list or dict):
        print(f"{ self.host } pushes message { json.dumps(data) } into channel '{ self.channel }'")
        self.redis.xadd(self.channel, { 
            "timestamp": time.time_ns(),
            "data": json.dumps(data) 
        })


    # Listen for PubSub communciation patterns on message queues
    #
    def listen(self, since="$"):
        try:
            self.redis.xgroup_create(self.channel, "consumers")
            print(f"{ self.host } creates consumer group for { self.channel }")
        except Exception as ex:
            print(f"{ self.host } has found already existing consumer group for { self.channel }")
        
        while True:
            messages = self.redis.xread({ self.channel : since }, count=100, block=10000)
            
            if not messages:
                return
            
            for entry in messages[0][1]:
                ts = time.time_ns()
                since, data = entry
                latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
                message = json.loads(data[b'data'].decode('utf8'))
                print(f"{latency:.2f}ms | msg { since }: { message }")
                yield (since, message)


    # Group listen for queueing communication patterns on message queues
    #
    def listen_as_group(self, group : str, timeout=10000):
        try:
            self.redis.xgroup_create(self.channel, "consumers")
            print(f"{ self.host } creates consumer group for { self.channel }")
        except Exception as ex:
            print(f"{ self.host } has found already existing consumer group for { self.channel }")

        while True:
            messages = self.redis.xreadgroup(group, self.host, { self.channel : ">" }, count=1, block=timeout)

            if not messages:
                return

            for entry in messages[0][1]:
                ts = time.time_ns()
                since, data = entry
                print(data)
                latency = (ts - int(data[b'timestamp'].decode('utf8'))) / 1000 / 1000
                if b'data' in data:
                    message = json.loads(data[b'data'].decode('utf8'))
                    print(f"{latency:.2f}ms | msg { since }: { message }")
                    yield (since, message)
                else:
                    print(f"Could not find payload data in msg { data }")