訊息元件
訊息元件協助應用程式發送和接收訊息到其他應用程式或透過訊息佇列。
此元件在很大程度上受到 Matthias Noback 關於命令總線的系列部落格文章和 SimpleBus 專案的啟發。
另請參閱
本文說明如何在任何 PHP 應用程式中將 Messenger 功能作為獨立元件使用。請閱讀Messenger:同步與佇列訊息處理文章,以了解如何在 Symfony 應用程式中使用它。
安裝
1
$ composer require symfony/messenger
注意
如果您在 Symfony 應用程式之外安裝此元件,您必須在程式碼中引入 vendor/autoload.php
檔案,以啟用 Composer 提供的類別自動載入機制。請閱讀這篇文章以獲取更多詳細資訊。
概念
- 發送器 (Sender):
- 負責序列化和發送訊息到某個東西。這個東西可以是訊息中介軟體或第三方 API 等。
- 接收器 (Receiver):
- 負責檢索、反序列化訊息並將訊息轉發到處理器。這可以是訊息佇列提取器或 API 端點等。
- 處理器 (Handler):
- 負責使用適用於訊息的業務邏輯來處理訊息。處理器由
HandleMessageMiddleware
中介軟體呼叫。 - 中介軟體 (Middleware):
- 中介軟體可以在訊息通過總線時訪問訊息及其包裝器(封套)。字面意思是「中間的軟體」,這些與應用程式的核心關注點(業務邏輯)無關。相反,它們是適用於整個應用程式並影響整個訊息總線的跨領域關注點。例如:日誌記錄、驗證訊息、啟動交易...。它們也負責呼叫鏈中的下一個中介軟體,這意味著它們可以調整封套,透過向其中新增戳記或甚至替換它,以及中斷中介軟體鏈。中介軟體在原始訊息被調度時以及稍後從傳輸接收到訊息時都會被呼叫。
- 封套 (Envelope):
- Messenger 特有的概念,它透過將訊息包裝在其中,允許透過封套戳記在內部新增有用的資訊,從而在訊息總線內部提供充分的彈性。
- 封套戳記 (Envelope Stamps):
- 您需要附加到訊息的資訊片段:用於傳輸的序列化器上下文、識別已接收訊息的標記或您的中介軟體或傳輸層可能使用的任何種類的中繼資料。
總線 (Bus)
總線用於調度訊息。總線的行為在其有序的中介軟體堆疊中。此元件帶有一組您可以使用的中介軟體。
當將訊息總線與 Symfony 的 FrameworkBundle 一起使用時,會為您配置以下中介軟體
- SendMessageMiddleware(啟用非同步處理,如果您提供記錄器,則記錄訊息的處理)
- HandleMessageMiddleware(呼叫已註冊的處理器)
範例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use App\Message\MyMessage;
use App\MessageHandler\MyMessageHandler;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
$handler = new MyMessageHandler();
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator([
MyMessage::class => [$handler],
])),
]);
$bus->dispatch(new MyMessage(/* ... */));
注意
每個中介軟體都需要實作 MiddlewareInterface。
處理器 (Handlers)
一旦調度到總線,訊息將由「訊息處理器」處理。訊息處理器是一個 PHP 可呼叫物件(即函數或類別的實例),它將為您的訊息執行所需的處理
1 2 3 4 5 6 7 8 9 10 11
namespace App\MessageHandler;
use App\Message\MyMessage;
class MyMessageHandler
{
public function __invoke(MyMessage $message): void
{
// Message processing...
}
}
新增中繼資料到訊息 (封套)
如果您需要向訊息新增中繼資料或某些配置,請使用 Envelope 類別包裝它並新增戳記。例如,要設定訊息通過傳輸層時使用的序列化群組,請使用 SerializerStamp
戳記
1 2 3 4 5 6 7 8 9 10
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
$bus->dispatch(
(new Envelope($message))->with(new SerializerStamp([
// groups are applied to the whole message, so make sure
// to define the group for every embedded object
'groups' => ['my_serialization_groups'],
]))
);
以下是一些與 Symfony Messenger 一起提供的重要的封套戳記
- DelayStamp,延遲處理非同步訊息。
- DispatchAfterCurrentBusStamp,使訊息在目前總線執行後處理。請在Messenger:同步與佇列訊息處理中閱讀更多資訊。
- HandledStamp,一個將訊息標記為由特定處理器處理的戳記。允許訪問處理器傳回值和處理器名稱。
- ReceivedStamp,一個內部戳記,將訊息標記為從傳輸接收。
- SentStamp,一個將訊息標記為由特定發送器發送的戳記。允許從 SendersLocator 訪問發送器 FQCN 和別名(如果可用)。
- SerializerStamp,設定傳輸使用的序列化群組。
- ValidationStamp,設定啟用驗證中介軟體時使用的驗證群組。
- ErrorDetailsStamp,當訊息由於處理器中的例外狀況而失敗時的內部戳記。
- ScheduledStamp,一個將訊息標記為由排程器產生的戳記。這有助於將其與「手動」建立的訊息區分開來。您可以在排程器文件中了解更多資訊。
注意
ErrorDetailsStamp 戳記包含一個 FlattenException,它是導致訊息失敗的例外狀況的表示形式。您可以使用 getFlattenException() 方法取得此例外狀況。此例外狀況已透過 FlattenExceptionNormalizer 正規化,這有助於 Messenger 環境中的錯誤報告。
您在接收封套的中介軟體中,而不是直接處理訊息。因此,您可以檢查封套內容及其戳記,或新增任何內容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use App\Message\Stamp\AnotherStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class MyOwnMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(ReceivedStamp::class)) {
// Message just has been received...
// You could for example add another stamp.
$envelope = $envelope->with(new AnotherStamp(/* ... */));
} else {
// Message was just originally dispatched
}
return $stack->next()->handle($envelope, $stack);
}
}
如果訊息剛被接收(即,至少有一個 ReceivedStamp
戳記),則上述範例會將訊息轉發到下一個帶有額外戳記的中介軟體。您可以透過實作 StampInterface 來建立您自己的戳記。
如果您想檢查封套上的所有戳記,請使用 $envelope->all()
方法,該方法傳回依類型 (FQCN) 分組的所有戳記。或者,您可以使用 FQCN 作為此方法的第一個參數(例如 $envelope->all(ReceivedStamp::class)
)來迭代特定類型的所有戳記。
注意
如果使用 Serializer 基礎序列化器通過傳輸,則任何戳記都必須使用 Symfony Serializer 元件進行序列化。
傳輸 (Transports)
為了發送和接收訊息,您必須配置傳輸。傳輸將負責與您的訊息中介軟體或第三方通信。
您自己的發送器 (Sender)
假設您已經有一個 ImportantAction
訊息通過訊息總線,並由處理器處理。現在,您也想將此訊息作為電子郵件發送(使用 Mime 和 Mailer 元件)。
使用 SenderInterface,您可以建立自己的訊息發送器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
namespace App\MessageSender;
use App\Message\ImportantAction;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Mime\Email;
class ImportantActionToEmailSender implements SenderInterface
{
public function __construct(
private MailerInterface $mailer,
private string $toEmail,
) {
}
public function send(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if (!$message instanceof ImportantAction) {
throw new \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));
}
$this->mailer->send(
(new Email())
->to($this->toEmail)
->subject('Important action made')
->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')
);
return $envelope;
}
}
您自己的接收器 (Receiver)
接收器負責從來源取得訊息並將其調度到應用程式。
假設您已經在應用程式中使用 NewOrder
訊息處理了一些「訂單」。現在您想與第三方或舊版應用程式整合,但您無法使用 API,並且需要使用包含新訂單的共用 CSV 檔案。
您將讀取此 CSV 檔案並調度 NewOrder
訊息。您需要做的就是編寫您自己的 CSV 接收器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
namespace App\MessageReceiver;
use App\Message\NewOrder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;
class NewOrdersFromCsvFileReceiver implements ReceiverInterface
{
private $connection;
public function __construct(
private SerializerInterface $serializer,
private string $filePath,
) {
// Available connection bundled with the Messenger component
// can be found in "Symfony\Component\Messenger\Bridge\*\Transport\Connection".
$this->connection = /* create your connection */;
}
public function get(): iterable
{
// Receive the envelope according to your transport ($yourEnvelope here),
// in most cases, using a connection is the easiest solution.
$yourEnvelope = $this->connection->get();
if (null === $yourEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $yourEnvelope['body'],
'headers' => $yourEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($yourEnvelope['id']);
throw $exception;
}
return [$envelope->with(new CustomStamp($yourEnvelope['id']))];
}
public function ack(Envelope $envelope): void
{
// Add information about the handled message
}
public function reject(Envelope $envelope): void
{
// In the case of a custom connection
$id = /* get the message id thanks to information or stamps present in the envelope */;
$this->connection->reject($id);
}
}
在同一個總線上的接收器和發送器
為了允許在同一個總線上發送和接收訊息,並防止無限迴圈,訊息總線會將 ReceivedStamp 戳記新增到訊息封套,而 SendMessageMiddleware 中介軟體將知道不應再次將這些訊息路由到傳輸。