Kihagyás

Message queue

Ezen dokumentum a jelenlegi (2025.07.21.) MQ kommunikáció Python oldali implementációját részletezi. A megvalósítás a stomp.py nevezetű csomaggal történt, ez biztosítja a kapcsolatot a message queue-val a STOMP protokollon keresztül.

1. Az objektumstruktúra

A kommunikáció a stomp.py könyvtáron keresztül történik. Ez kapcsolatok, illetve listener-ek segítségével dolgozik, utóbbiak figyelnek egy vagy több címet, majd reagálnak, amennyiben érkezik oda egy üzenet.

1.1 AsyncListener

Az egyik kihívás az, hogy alapvetően a stomp.py könyvtár nem aszinkron kóddal dolgozik, hanem threading alapú megoldásokat használ, és az üzenetekre pedig szinkron módon reagál. Mivel a jelenlegi megvalósításban nem sok párhuzamosan futó egység dolgozza fel a message queue-t, hanem egyetlen Python folyamat, így célszerű ezen belül aszinkron módon feldolgozni a beérkező üzeneteket.

Erre szolgál megoldásul az AsyncListener osztály. A Listener-hez hasonlóan ez az on_message helyett az aon_message aszinkron method segítségével dolgozza fel az üzeneteket. Implementáció szinten valójában annyi történik, hogy üzenet esetén az on_message method elindítja az aon_message method-ot egy előre megadott event loop-ban, majd amikor az véget ér, minimális tisztítást végez. Az aon_message meghívása alapvetően failsafe módon van megírva, azaz ha hiba történik, akkor a program nem hal el, a hiba meg logolásra kerül (az mq.stomp.listener logger által).

1.2 DispatchListener

Bármilyen destination alapú elkülönítéshez egy józan hozzáállás, ha egy úthoz egy függvényt szeretnénk gyakorlatilag végrehajtani. Ezt valósítja meg a DispatchListener az AsyncListener-re épülve. Az objektum bemenete egy (út, aszinkron kezelési függvény) párokból álló dictionary, ahol a kezelési függvények bemenete a STOMP üzenet Frame objektuma, kimenete pedig nincs.

1.3 endpoint_converison.py

A message queue-nk jelenlegi alkalmazása gyakorlatilag egy endpoint-tükrözés: létező endpoint-okat szeretnénk message queue rendszeren keresztül használni. Ez a fájl ahhoz tartalmaz különböző eszközöket, hogy endpoint-okat egyszerűen át tudjunk alakítani aszinkron kezelési függvényekké.

A simple_pydantic_message_handler_factory olyan kezelési függvényt gyárt egy endpoint függvényből (és annak egyetlen pydantic modell bemeneti osztályából), mely a bemeneti Frame body-jából elkészíti az endpoint bemenetet, majd az endpoint-ot lefuttatva a választ is feldolgozza egy response_handler aszinkron függvény segítségével (ez a factory bemenete). További paraméterekkel megadhatunk egy elővalidációs függvényt, illetve sikeres és sikertelen hibakódokat (ez logolásnál szempontjából célszerű).

A response_sending_message_handler_factory egy olyan kezelési függvény gyárt egy endpoint függvényből, mely egy megadott MQ célpontra elküldi az endpoint válaszát. Az endpoint függvényre, illetve a bemeneti modellre továbbra is szükség van. Emellett kell egy STOMP connection, továbbá egy módszer arra, hogy hogy találjuk meg a válaszcímet (ez lehet egy explicit string, de azt is meg lehet adni, hogy header-ből nyerje ki a rendszer).

1.4 StompClient

Végül de nem utolsó sorban a StompClient objektum, ami inicializálja az egész pipeline-t. A connect method-ja megépíti a kezelési függvényeket megadott endpoint-okból. Amennyiben magát az endpoint route-ot adjuk meg, a FastAPI applikáció objektumból a megfelelő endpoint függvény mellett ki tudjuk nyerni az OK státusz kódot is, ezt használjuk az endpoint conversion függvényhíváshoz.

Emellett hiba esetén is a FastAPI objektumot használjuk, hogy megtaláljuk a megfelelő hibakódot.

Technikai okokból a build_listeners method nem az __init__ része, ugyanis ahhoz, hogy az AsyncListener-t meg tudjuk konstruálni, meg kell adnunk neki a loop-ot, amibe delegálja a feladatokat. Ugyanakkor ez a loop az __init__ meghívásakor még nem feltétlen létezik. Ugyanezen okból a build_listeners az egy FastAPI lifespan függvény segítségével van meghívva.

2. Technikai kérdések

Mindezek után nem feltétlenül világos, hogy ebben az egész pipeline-ban mi szinkron, aszinkron, és mi hol fut. Ebben az is közrejátszik, hogy a stomp.py könyvtár automatikusan felállít egy event loopot saját maga, de ez nem egy async event loop. A szerkezet a következő tehát: - Az AsyncListener egy thread-ben fut, amit a StompConnection automatikusan létrehoz. Továbbá az eredeti method-jai szinkron módon futnak. - Az AsyncListener aon_message methodja aszinkron fut, abban az async event loopban, amit az AsyncListener konstruktorának megadtunk (ez a main thread loop-ja jelen esetben, amin az uvicorn.run is fut)

Mivel a StompConnection automatikusan létrehoz deamon thread-eket, nekünk nem kell foglalkoznunk azzal, hogy magát a „connection-t elindítsuk“ egy új thread-ben, hiszen maga a connection nem is fut, hanem csak a Listener-ek. Továbbá mivel a main thread-ben fut egy asyncio.run_forever az uvicorn által, azzal sem kell törődnünk, hogy a daemon thread-eket életben tartsuk, azok pontosan addig élnek, amíg a main thread-et nem csapjuk le.