Messenger:同步 & 排隊訊息處理
Messenger 提供訊息總線,能夠傳送訊息,然後在您的應用程式中立即處理,或透過傳輸器 (例如佇列) 傳送訊息,以便稍後處理。若要深入瞭解,請閱讀 Messenger 组件文件。
建立訊息 & 處理器
Messenger 圍繞著您將建立的兩個不同類別:(1) 訊息類別,用於保存資料,以及 (2) 處理器類別,該類別將在分派該訊息時被呼叫。處理器類別將讀取訊息類別並執行一或多個任務。
訊息類別沒有特定的要求,除了它可以被序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
public function __construct(
private string $content,
) {
}
public function getContent(): string
{
return $this->content;
}
}
訊息處理器是一個 PHP 可呼叫物件,建議的建立方式是建立一個具有 AsMessageHandler 屬性,並且具有以訊息類別 (或訊息介面) 類型提示的 __invoke()
方法的類別
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message)
{
// ... do some work - like sending an SMS message!
}
}
提示
您也可以在個別類別方法上使用 #[AsMessageHandler]
屬性。您可以在單個類別中根據需要對任意多個方法使用此屬性,讓您可以將多種類型相關訊息的處理分組。
感謝 自動配置 和 SmsNotification
類型提示,Symfony 知道在分派 SmsNotification
訊息時應呼叫此處理器。大多數情況下,這就是您需要做的全部工作。但是您也可以手動設定訊息處理器。若要查看所有已設定的處理器,請執行
1
$ php bin/console debug:messenger
分派訊息
您準備好了!若要分派訊息 (並呼叫處理器),請注入 messenger.default_bus
服務 (透過 MessageBusInterface
),就像在控制器中一樣
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus): Response
{
// will cause the SmsNotificationHandler to be called
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// ...
}
}
傳輸器:非同步/排隊訊息
預設情況下,訊息會在分派後立即處理。如果您想要非同步處理訊息,您可以設定傳輸器。傳輸器能夠傳送訊息 (例如到佇列系統),然後透過 Worker 接收它們。Messenger 支援多個傳輸器。
注意
如果您想要使用不受支援的傳輸器,請查看 Enqueue 的傳輸器,它支援 Kafka 和 Google Pub/Sub 等服務。
傳輸器使用 "DSN" 註冊。感謝 Messenger 的 Flex recipe,您的 .env
檔案已經有一些範例。
1 2 3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages
取消註解您想要的任何傳輸器 (或在 .env.local
中設定)。請參閱 Messenger:同步 & 排隊訊息處理 以取得更多詳細資訊。
接下來,在 config/packages/messenger.yaml
中,讓我們定義一個名為 async
的傳輸器,它使用此設定
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
# or expanded to configure more options
#async:
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
# options: []
將訊息路由到傳輸器
現在您已經設定了傳輸器,您可以將訊息設定為傳送到傳輸器,而不是立即處理訊息
1 2 3 4 5 6 7 8 9 10
// src/Message/SmsNotification.php
namespace App\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage('async')]
class SmsNotification
{
// ...
}
7.2
#[AsMessage]
屬性在 Symfony 7.2 中引入。
如此一來,App\Message\SmsNotification
將被傳送到 async
傳輸器,並且它的處理器將不會立即被呼叫。在 routing
下未匹配的任何訊息仍將立即處理,即同步處理。
注意
如果您同時使用 YAML/XML/PHP 設定檔和 PHP 屬性設定路由,則設定始終優先於類別屬性。此行為允許您在每個環境的基礎上覆寫路由。
注意
在單獨的 YAML/XML/PHP 檔案中設定路由時,您可以使用部分 PHP 命名空間,例如 'App\Message\*'
來匹配匹配命名空間中的所有訊息。唯一的要求是 '*'
通配符必須放在命名空間的末尾。
您可以將 '*'
用作訊息類別。這將充當任何在 routing
下未匹配的訊息的預設路由規則。這對於確保預設情況下沒有訊息被同步處理非常有用。
唯一的缺點是 '*'
也將應用於使用 Symfony Mailer 發送的電子郵件 (當 Messenger 可用時,它使用 SendEmailMessage
)。如果您的電子郵件不可序列化 (例如,如果它們包含作為 PHP 資源/流的文件附件),這可能會導致問題。
您也可以按其父類別或介面路由類別。或者將訊息傳送到多個傳輸器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// src/Message/SmsNotification.php
namespace App\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage(['async', 'audit'])]
class SmsNotification
{
// ...
}
// if you prefer, you can also apply multiple attributes to the message class
#[AsMessage('async')]
#[AsMessage('audit')]
class SmsNotification
{
// ...
}
注意
如果您為子類別和父類別都設定了路由,則兩個規則都會被使用。例如,如果您有一個從 Notification
擴展而來的 SmsNotification
物件,則 Notification
和 SmsNotification
的路由都將被使用。
提示
您可以使用訊息信封上的 TransportNamesStamp 在運行時定義和覆寫訊息正在使用的傳輸器。此戳記將傳輸器名稱陣列作為其唯一引數。有關戳記的更多資訊,請參閱 信封 & 戳記。
訊息中的 Doctrine 實體
如果您需要在訊息中傳遞 Doctrine 實體,最好傳遞實體的主鍵 (或處理器實際需要的任何相關資訊,例如 email
等) 而不是物件 (否則您可能會看到與實體管理器相關的錯誤)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
public function __construct(
private int $userId,
) {
}
public function getUserId(): int
{
return $this->userId;
}
}
然後,在您的處理器中,您可以查詢新的物件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
public function __construct(
private UserRepository $userRepository,
) {
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... send an email!
}
}
這保證了實體包含最新的資料。
同步處理訊息
如果訊息不符合任何路由規則,則它不會被傳送到任何傳輸器,並且將立即處理。在某些情況下 (例如當將處理器綁定到不同的傳輸器時),顯式處理此問題更容易或更靈活:透過建立 sync
傳輸器並在那裡 "傳送" 訊息以立即處理
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... other transports
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
建立您自己的傳輸器
如果您需要從不受支援的物件傳送或接收訊息,您也可以建立自己的傳輸器。請參閱 如何建立您自己的 Messenger 傳輸器。
消費訊息 (執行 Worker)
一旦您的訊息被路由,在大多數情況下,您需要 "消費" 它們。您可以使用 messenger:consume
命令來執行此操作
1 2 3 4
$ php bin/console messenger:consume async
# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv
第一個引數是接收器的名稱 (或服務 ID,如果您路由到自訂服務)。預設情況下,命令將永遠運行:在您的傳輸器上尋找新訊息並處理它們。此命令稱為您的 "worker"。
如果您想要消費來自所有可用接收器的訊息,您可以使用帶有 --all
選項的命令
1
$ php bin/console messenger:consume --all
7.1
--all
選項在 Symfony 7.1 中引入。
--keepalive
選項可用於防止訊息在長時間運行的處理過程中過早地重新傳遞。它將訊息標記為 "處理中",並防止在 Worker 完成處理之前重新傳遞它。
注意
此選項僅適用於受支援的傳輸器,即 Beanstalkd 和 AmazonSQS 傳輸器。
7.2
--keepalive
選項在 Symfony 7.2 中引入。
提示
在開發環境中,如果您正在使用 Symfony CLI 工具,您可以設定 Worker 與 Web 伺服器一起自動運行。您可以在 Symfony CLI Workers 文件中找到更多資訊。
提示
若要正確停止 Worker,請拋出 StopWorkerException 的實例。
部署到生產環境
在生產環境中,有一些重要的事項需要考慮
- 使用 Process Manager (例如 Supervisor 或 systemd) 來保持您的 Worker 運行
- 您需要一個或多個 "worker" 始終運行。若要執行此操作,請使用程序控制系統,例如 Supervisor 或 systemd。
- 不要讓 Worker 永遠運行
- 某些服務 (例如 Doctrine 的
EntityManager
) 會隨著時間的推移消耗更多記憶體。因此,不要讓您的 Worker 永遠運行,而是使用諸如messenger:consume --limit=10
之類的標誌來告訴您的 Worker 僅處理 10 條訊息後退出 (然後程序管理器將建立一個新程序)。還有其他選項,例如--memory-limit=128M
和--time-limit=3600
。 - 停止遇到錯誤的 Worker
- 如果 Worker 依賴項 (例如您的資料庫伺服器) 關閉,或達到逾時,您可以嘗試新增重新連線邏輯,或者如果 Worker 收到太多錯誤,可以使用
messenger:consume
命令的--failure-limit
選項來退出 Worker。 - 在部署時重新啟動 Worker
- 每次部署時,您都需要重新啟動所有 Worker 程序,以便它們看到新部署的程式碼。若要執行此操作,請在部署時運行
messenger:stop-workers
。這將向每個 Worker 發出信號,指示它應該完成當前正在處理的訊息並應優雅地關閉。然後,程序管理器將建立新的 Worker 程序。該命令在內部使用 app 快取 - 因此請確保將其設定為使用您喜歡的适配器。 - 在部署之間使用相同的快取
- 如果您的部署策略涉及建立新的目標目錄,您應該為 cache.prefix_seed 設定選項設定一個值,以便在部署之間使用相同的快取命名空間。否則,
cache.app
池將使用kernel.project_dir
參數的值作為命名空間的基礎,這將導致每次進行新部署時使用不同的命名空間。
優先順序傳輸器
有時,某些類型的訊息應該具有更高的優先順序,並在其他訊息之前處理。為了使這成為可能,您可以建立多個傳輸器並將不同的訊息路由到它們。例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
# queue_name is specific to the doctrine transport
queue_name: high
# for AMQP send to a separate exchange then queue
#exchange:
# name: high
#queues:
# messages_high: ~
# for redis try "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
routing:
'App\Message\SmsNotification': async_priority_low
'App\Message\NewUserWelcomeEmail': async_priority_high
然後,您可以為每個傳輸器運行單獨的 Worker,或指示一個 Worker 按優先順序處理訊息
1
$ php bin/console messenger:consume async_priority_high async_priority_low
Worker 將始終首先尋找在 async_priority_high
上等待的訊息。如果沒有,然後它將消費來自 async_priority_low
的訊息。
限制消費特定佇列
某些傳輸器 (尤其是 AMQP) 具有交換器和佇列的概念。Symfony 傳輸器始終綁定到交換器。預設情況下,Worker 從附加到指定傳輸器的交換器的所有佇列中消費。但是,在某些用例中,您可能希望 Worker 僅從特定佇列中消費。
您可以限制 Worker 僅處理來自特定佇列的訊息
1 2 3 4
$ php bin/console messenger:consume my_transport --queues=fasttrack
# you can pass the --queues option more than once to process multiple queues
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2
注意
若要允許使用 queues
選項,接收器必須實現 QueueReceiverInterface。
檢查每個傳輸器的排隊訊息數量
運行 messenger:stats
命令以了解某些或所有傳輸器的 "佇列" 中有多少訊息
1 2 3 4 5 6 7 8 9
# displays the number of queued messages in all transports
$ php bin/console messenger:stats
# shows stats only for some transports
$ php bin/console messenger:stats my_transport_name other_transport_name
# you can also output the stats in JSON format
$ php bin/console messenger:stats --format=json
$ php bin/console messenger:stats my_transport_name other_transport_name --format=json
7.2
format
選項在 Symfony 7.2 中引入。
注意
為了使此命令起作用,設定的傳輸器的接收器必須實現 MessageCountAwareInterface。
Supervisor 設定
Supervisor 是一個很棒的工具,可以保證您的 Worker 程序始終運行 (即使它由於故障、達到訊息限制或感謝 messenger:stop-workers
而關閉)。例如,您可以透過以下方式在 Ubuntu 上安裝它
1
$ sudo apt-get install supervisor
Supervisor 設定檔通常位於 /etc/supervisor/conf.d
目錄中。例如,您可以在那裡建立一個新的 messenger-worker.conf
檔案,以確保 messenger:consume
的 2 個實例始終在運行
1 2 3 4 5 6 7 8 9 10
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d
變更 async
引數以使用您的傳輸方式(或多種傳輸方式)的名稱,並將 user
變更為您伺服器上的 Unix 使用者。
警告
在部署期間,某些東西可能無法使用(例如,資料庫),導致消費者啟動失敗。在這種情況下,Supervisor 將嘗試 startretries
次數來重新啟動命令。請務必變更此設定,以避免命令進入 FATAL 狀態,該狀態將永遠不會再次重新啟動。
每次重新啟動,Supervisor 都會將延遲時間增加 1 秒。例如,如果值為 10
,它將等待 1 秒、2 秒、3 秒等。這給予服務總共 55 秒的時間再次可用。增加 startretries
設定以涵蓋最大預期停機時間。
如果您使用 Redis Transport,請注意每個 worker 都需要一個唯一的消費者名稱,以避免同一個訊息被多個 worker 處理。實現此目的的一種方法是在 Supervisor 設定檔中設定環境變數,然後您可以在 messenger.yaml
中參考該變數(請參閱下方的 Redis 章節)
1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
接下來,告訴 Supervisor 讀取您的設定並啟動您的 workers
1 2 3 4 5 6 7 8 9
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
# If you deploy an update of your code, don't forget to restart your workers
# to run the new code
$ sudo supervisorctl restart messenger-consume:*
請參閱 Supervisor 文件 以取得更多詳細資訊。
優雅關閉
如果您在專案中安裝了 PCNTL PHP 擴充功能,workers 將處理 SIGTERM
或 SIGINT
POSIX 訊號,以在終止前完成處理當前訊息。
但是,您可能更喜歡使用不同的 POSIX 訊號來進行優雅關閉。您可以透過設定 framework.messenger.stop_worker_on_signals
設定選項來覆寫預設訊號。
在某些情況下,SIGTERM
訊號是由 Supervisor 本身發送的(例如,停止將 Supervisor 作為其 entrypoint 的 Docker 容器)。在這些情況下,您需要在程式設定中新增 stopwaitsecs
鍵(值為所需的寬限期秒數),以便執行優雅關閉
1 2
[program:x]
stopwaitsecs=20
Systemd 設定
雖然 Supervisor 是一個很棒的工具,但它的缺點是您需要系統存取權限才能執行它。Systemd 已成為大多數 Linux 發行版的標準,並且有一個稱為使用者服務的良好替代方案。
Systemd 使用者服務設定檔通常位於 ~/.config/systemd/user
目錄中。例如,您可以建立一個新的 messenger-worker.service
檔案。或者,如果您想要同時運行更多實例,則可以建立 messenger-worker@.service
檔案
1 2 3 4 5 6 7 8 9 10 11 12
[Unit]
Description=Symfony messenger-consume %i
[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
# for Redis, set a custom consumer name for each instance
Environment="MESSENGER_CONSUMER_NAME=symfony-%n-%i"
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
現在,告訴 systemd 啟用並啟動一個 worker
1 2 3 4 5 6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service
# to enable and start 20 workers
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service
如果您變更了服務設定檔,則需要重新載入 daemon
1
$ systemctl --user daemon-reload
要重新啟動所有消費者
1
$ systemctl --user restart messenger-consume@*.service
systemd 使用者實例僅在特定使用者的第一次登入後啟動。消費者通常需要在系統啟動時啟動。啟用使用者的 lingering 功能以啟動該行為
1
$ loginctl enable-linger <your-username>
日誌由 journald 管理,可以使用 journalctl 命令進行處理
1 2 3 4 5 6 7 8
# follow logs of consumer nr 11
$ journalctl -f --user-unit messenger-consume@11.service
# follow logs of all consumers
$ journalctl -f --user-unit messenger-consume@*
# follow all logs from your user services
$ journalctl -f _UID=$UID
請參閱 systemd 文件 以取得更多詳細資訊。
注意
您需要 journalctl
命令的提升權限,或將您的使用者新增到 systemd-journal 群組
1
$ sudo usermod -a -G systemd-journal <your-username>
無狀態 Worker
PHP 的設計是無狀態的,不同請求之間沒有共享資源。在 HTTP 環境中,PHP 在發送回應後會清除所有內容,因此您可以決定不處理可能洩漏記憶體的服務。
另一方面,workers 通常在長時間運行的 CLI 程序中依序處理訊息,這些程序不會在處理單一訊息後結束。請注意服務狀態,以防止資訊和/或記憶體洩漏,因為 Symfony 會將同一個服務實例注入到所有訊息中,從而保留服務的內部狀態。
但是,某些 Symfony 服務(例如 Monolog fingers crossed handler)在設計上會洩漏。Symfony 提供了一個服務重設功能來解決此問題。當在兩個訊息之間自動重設容器時,Symfony 會尋找任何實作 ResetInterface 的服務(包括您自己的服務),並呼叫它們的 reset()
方法,以便它們可以清除其內部狀態。
如果服務不是無狀態的,並且您希望在每個訊息之後重設其屬性,則該服務必須實作 ResetInterface,您可以在 reset()
方法中重設屬性。
如果您不想重設容器,請在運行 messenger:consume
命令時新增 --no-reset
選項。
速率限制傳輸器
有時您可能需要限制訊息 worker 的速率。您可以在傳輸方式上設定速率限制器(需要 RateLimiter component),方法是設定其 rate_limiter
選項
1 2 3 4 5 6
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
rate_limiter: your_rate_limiter_name
警告
當在傳輸方式上設定速率限制器時,當達到限制時,它將阻止整個 worker。您應確保為受速率限制的傳輸方式設定專用的 worker,以避免其他傳輸方式被阻止。
重試 & 失敗
如果在從傳輸方式消費訊息時拋出例外,它將自動重新發送到傳輸方式以再次嘗試。預設情況下,訊息將重試 3 次,然後再被丟棄或 傳送到失敗傳輸方式。每次重試也會延遲,以防故障是由於暫時性問題造成的。所有這些都可以在每個傳輸方式上設定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# default configuration
retry_strategy:
max_retries: 3
# milliseconds delay
delay: 1000
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
max_delay: 0
# applies randomness to the delay that can prevent the thundering herd effect
# the value (between 0 and 1.0) is the percentage of 'delay' that will be added/subtracted
jitter: 0.1
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
7.1
jitter
選項是在 Symfony 7.1 中引入的。
提示
當訊息被重試時,Symfony 會觸發 WorkerMessageRetriedEvent,以便您可以運行自己的邏輯。
注意
感謝 SerializedMessageStamp,訊息的序列化形式會被保存,這可以防止在稍後重試訊息時再次序列化它。
避免重試
有時,處理訊息可能會以您知道是永久性的方式失敗,不應重試。如果您拋出 UnrecoverableMessageHandlingException,則不會重試該訊息。
注意
不會重試的訊息仍會顯示在已設定的失敗傳輸方式中。如果您想避免這種情況,請考慮自行處理錯誤並讓處理常式成功結束。
強制重試
有時,處理訊息必須以您知道是暫時性的方式失敗,並且必須重試。如果您拋出 RecoverableMessageHandlingException,則訊息將永遠重試,並且 max_retries
設定將被忽略。
您可以透過在 RecoverableMessageHandlingException
的建構子中設定 retryDelay
引數來定義自訂重試延遲時間(例如,使用 HTTP 回應中的 Retry-After
標頭中的值)。
7.2
retryDelay
引數和 getRetryDelay()
方法是在 Symfony 7.2 中引入的。
儲存 & 重試失敗訊息
如果訊息失敗,它會被重試多次 (max_retries
),然後將被丟棄。為了避免這種情況發生,您可以改為設定 failure_transport
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
failure_transport: failed
transports:
# ... other transports
failed: 'doctrine://default?queue_name=failed'
在此範例中,如果處理訊息失敗 3 次(預設 max_retries
),則會將其發送到 failed
傳輸方式。雖然您可以使用 messenger:consume failed
來像正常傳輸方式一樣消費它,但您通常會想要手動檢視失敗傳輸方式中的訊息,並選擇重試它們
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# see all messages in the failure transport with a default limit of 50
$ php bin/console messenger:failed:show
# see the 10 first messages
$ php bin/console messenger:failed:show --max=10
# see only MyClass messages
$ php bin/console messenger:failed:show --class-filter='MyClass'
# see the number of messages by message class
$ php bin/console messenger:failed:show --stats
# see details about a specific failure
$ php bin/console messenger:failed:show 20 -vv
# for each message, this command asks whether to retry, skip, or delete
$ php bin/console messenger:failed:retry -vv
# retry specific messages
$ php bin/console messenger:failed:retry 20 30 --force
# remove a message without retrying it
$ php bin/console messenger:failed:remove 20
# remove messages without retrying them and show each message before removing it
$ php bin/console messenger:failed:remove 20 30 --show-messages
# remove all messages in the failure transport
$ php bin/console messenger:failed:remove --all
如果訊息再次失敗,由於正常的 重試規則,它將重新發送回失敗傳輸方式。一旦達到最大重試次數,訊息將被永久丟棄。
7.2
在 messenger:failed:retry
命令中跳過訊息的選項是在 Symfony 7.2 中引入的
多個失敗的傳輸器
有時,擁有單一的全域 failed transport
設定是不夠的,因為有些訊息比其他訊息更重要。在這些情況下,您可以僅針對特定傳輸方式覆寫失敗傳輸方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
# by default if no "failed_transport" is configured inside a transport
failure_transport: failed_default
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
failure_transport: failed_high_priority
# since no failed transport is configured, the one used will be
# the global "failure_transport" set
async_priority_low:
dsn: 'doctrine://default?queue_name=async_priority_low'
failed_default: 'doctrine://default?queue_name=failed_default'
failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
如果沒有全域或在傳輸方式層級定義 failure_transport
,則訊息將在重試次數後被丟棄。
失敗命令有一個可選選項 --transport
,用於指定在傳輸方式層級設定的 failure_transport
。
1 2 3 4 5 6 7 8
# see all messages in "failure_transport" transport
$ php bin/console messenger:failed:show --transport=failure_transport
# retry specific messages from "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
# remove a message without retrying it from "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
傳輸器設定
Messenger 支援多種不同的傳輸方式類型,每種類型都有自己的選項。選項可以透過 DSN 字串或設定傳遞到傳輸方式。
1 2
# .env
MESSENGER_TRANSPORT_DSN=amqp://127.0.0.1/%2f/messages?auto_setup=false
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
auto_setup: false
在 options
下定義的選項優先於在 DSN 中定義的選項。
AMQP 傳輸器
AMQP 傳輸方式使用 AMQP PHP 擴充功能將訊息發送到 RabbitMQ 等佇列。透過運行以下命令安裝它
1
$ composer require symfony/amqp-messenger
AMQP 傳輸方式 DSN 可能看起來像這樣
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# or use the AMQPS protocol
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages
如果您想使用 TLS/SSL 加密的 AMQP,您還必須提供 CA 憑證。在 amqp.cacert
PHP.ini 設定(例如 amqp.cacert = /etc/ssl/certs
)或 DSN 的 cacert
參數(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/
)中定義憑證路徑。
TLS/SSL 加密 AMQP 使用的預設埠是 5671,但您可以在 DSN 的 port
參數中覆寫它(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/&port=12345
)。
注意
預設情況下,傳輸方式將自動建立所需的任何交換器、佇列和綁定金鑰。可以停用此功能,但某些功能可能無法正常運作(例如延遲佇列)。若要不自動建立任何佇列,您可以將傳輸方式設定為 queues: []
。
注意
您可以限制 AMQP 傳輸方式的消費者僅處理來自交換器某些佇列的訊息。請參閱 Messenger:同步和佇列訊息處理。
傳輸方式還有許多其他選項,包括設定交換器、佇列綁定金鑰等的方法。請參閱 Connection 上的文件。
傳輸方式有多個選項
auto_setup
(預設值:true
)- 是否應在發送/取得期間自動建立交換器和佇列。
cacert
- PEM 格式的 CA 憑證檔案路徑。
cert
- PEM 格式的用戶端憑證路徑。
channel_max
- 指定伺服器允許的最高通道號碼。0 表示標準擴充功能限制
confirm_timeout
- 確認逾時時間(以秒為單位);如果未指定,傳輸方式將不會等待訊息確認。注意:0 秒或更長。可以是小數。
connect_timeout
- 連線逾時時間。注意:0 秒或更長。可以是小數。
frame_max
- 伺服器針對連線提出的最大幀大小,包括幀標頭和結束位元組。0 表示標準擴充功能限制(取決於 librabbimq 預設幀大小限制)
heartbeat
- 伺服器想要的連線心跳延遲時間(以秒為單位)。0 表示伺服器不想要心跳。注意,librabbitmq 具有有限的心跳支援,這表示僅在封鎖呼叫期間檢查心跳。
host
- AMQP 服務的主機名稱
key
- PEM 格式的用戶端金鑰路徑。
login
- 用於連線 AMQP 服務的使用者名稱
password
- 用於連線 AMQP 服務的密碼
persistent
(預設值:'false'
)- 連線是否為持久連線
port
- AMQP 服務的埠
read_timeout
- 輸入活動的逾時時間。注意:0 秒或更長。可以是小數。
retry
- (無描述)
sasl_method
- (無描述)
connection_name
- 用於自訂連線名稱(至少需要 PHP AMQP 擴充功能的 1.10 版本)
verify
- 啟用或停用同儕驗證。如果啟用同儕驗證,則伺服器憑證中的通用名稱必須與伺服器名稱相符。預設情況下啟用同儕驗證。
vhost
- 要與 AMQP 服務一起使用的虛擬主機
write_timeout
- 輸出活動的逾時時間。注意:0 秒或更長。可以是小數。
delay[queue_name_pattern]
(預設值:delay_%exchange_name%_%routing_key%_%delay%
)- 用於建立佇列的模式
delay[exchange_name]
(預設值:delays
)- 用於延遲/重試訊息的交換器名稱
queues[name][arguments]
- 額外引數
queues[name][binding_arguments]
- 綁定佇列時要使用的引數。
queues[name][binding_keys]
- 要綁定到此佇列的綁定金鑰(如果有的話)
queues[name][flags]
(預設值:AMQP_DURABLE
)- 佇列旗標
exchange[arguments]
- 交換器的額外引數(例如
alternate-exchange
) exchange[default_publish_routing_key]
- 發佈時要使用的路由金鑰,如果訊息上未指定任何路由金鑰
exchange[flags]
(預設值:AMQP_DURABLE
)- 交換器旗標
exchange[name]
- 交換器的名稱
exchange[type]
(預設值:fanout
)- 交換器的類型
您也可以透過將 AmqpStamp 新增到您的 Envelope,在您的訊息上設定 AMQP 特有的設定
1 2 3 4 5 6 7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
警告
消費者不會顯示在管理面板中,因為此傳輸方式不依賴於會封鎖的 \AmqpQueue::consume()
。擁有封鎖接收器會使 messenger:consume
命令的 --time-limit/--memory-limit
選項以及 messenger:stop-workers
命令效率低下,因為它們都依賴於接收器立即返回的事實,無論它是否找到訊息。消費 worker 負責迭代,直到它收到要處理的訊息和/或直到達到其中一個停止條件。因此,如果 worker 卡在封鎖呼叫中,則無法達到 worker 的停止邏輯。
Doctrine 傳輸器
Doctrine 傳輸方式可用於將訊息儲存在資料庫表格中。透過運行以下命令安裝它
1
$ composer require symfony/doctrine-messenger
Doctrine 傳輸方式 DSN 可能看起來像這樣
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
格式為 doctrine://<connection_name>
,如果您有多個連線並且想要使用「default」以外的連線。傳輸方式將自動建立一個名為 messenger_messages
的表格。
如果您想變更預設表格名稱,請使用 table_name
選項在 DSN 中傳遞自訂表格名稱
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name
或者,若要自行建立表格,請將 auto_setup
選項設定為 false
並 產生遷移。
傳輸方式有多個選項
table_name
(預設值:messenger_messages
)- 表格名稱
queue_name
(預設值:default
)- 佇列名稱(表格中的一個欄位,用於將一個表格用於多個傳輸方式)
redeliver_timeout
(預設值:3600
)-
重試佇列中但處於「處理中」狀態的訊息之前的逾時時間(如果 worker 因故停止,則會發生這種情況,最終您應該重試訊息) - 以秒為單位。
注意
將
redeliver_timeout
設定為大於您最慢訊息持續時間的值。否則,某些訊息將在第一個訊息仍在處理時第二次啟動。 auto_setup
- 是否應在發送/取得期間自動建立表格。
使用 PostgreSQL 時,您可以存取以下選項以利用 LISTEN/NOTIFY 功能。與 Doctrine 傳輸方式的預設輪詢行為相比,這允許更高效的方法,因為 PostgreSQL 將在表格中插入新訊息時直接通知 workers。
use_notify
(預設值:true
)- 是否使用 LISTEN/NOTIFY。
check_delayed_interval
(預設值:60000
)- 檢查延遲訊息的間隔(以毫秒為單位)。設定為 0 以停用檢查。
get_notify_timeout
(預設值:0
)- 呼叫
PDO::pgsqlGetNotify
時等待回應的時間長度(以毫秒為單位)。
Beanstalkd 傳輸器
Beanstalkd 傳輸方式將訊息直接發送到 Beanstalkd 工作佇列。透過運行以下命令安裝它
1
$ composer require symfony/beanstalkd-messenger
Beanstalkd 傳輸方式 DSN 可能看起來像這樣
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1:11300?tube_name=foo&timeout=4&ttr=120
# If no port, it will default to 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1
傳輸方式有多個選項
tube_name
(預設值:default
)- 佇列名稱
timeout
(預設值:0
)- 訊息預約逾時時間 - 以秒為單位。0 將導致伺服器立即傳回回應,否則將拋出 TransportException。
ttr
(預設值:90
)- 訊息在放回就緒佇列之前要運行的時間 - 以秒為單位。
7.2
在 Symfony 7.2 中新增了使用 --keepalive
選項的 Keepalive 支援。
Redis 傳輸器
Redis 傳輸方式使用 streams 來佇列訊息。此傳輸方式需要 Redis PHP 擴充功能(>=4.3)和正在運行的 Redis 伺服器(^5.0)。透過運行以下命令安裝它
1
$ composer require symfony/redis-messenger
Redis 傳輸方式 DSN 可能看起來像這樣
1 2 3 4 5 6 7 8 9 10 11 12
# .env
MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages
# Full DSN Example
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
# Redis Cluster Example
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Unix Socket Example
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# TLS Example
MESSENGER_TRANSPORT_DSN=rediss://127.0.0.1:6379/messages
# Multiple Redis Sentinel Hosts Example
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db
可以透過 DSN 或 messenger.yaml
中傳輸方式下的 options
鍵來設定多個選項
stream
(預設值:messages
)- Redis stream 名稱
group
(預設值:symfony
)- Redis 消費者群組名稱
consumer
(預設值:consumer
)- Redis 中使用的消費者名稱
auto_setup
(預設值:true
)- 是否自動建立 Redis 群組
auth
- Redis 密碼
delete_after_ack
(預設值:true
)- 如果
true
,訊息會在處理後自動刪除 delete_after_reject
(預設值:true
)- 如果
true
,訊息會在被拒絕後自動刪除 lazy
(預設值:false
)- 僅在真正需要連線時才連線
serializer
(預設值:Redis::SERIALIZER_PHP
)- 如何在 Redis 中序列化最終酬載(
Redis::OPT_SERIALIZER
選項) stream_max_entries
(預設值:0
)- stream 將被修剪到的最大條目數。將其設定為足夠大的數字,以避免遺失待處理的訊息
redeliver_timeout
(預設值:3600
)- 在重試由已放棄的消費者擁有的待處理訊息之前的逾時時間(以秒為單位)(如果 worker 因故死掉,則會發生這種情況,最終您應該重試訊息)。
claim_interval
(預設值:60000
)- 應檢查待處理/已放棄訊息以進行聲明的間隔 - 以毫秒為單位
persistent_id
(預設值:null
)- 字串,如果為 null,則連線為非持久連線。
retry_interval
(預設值:0
)- 整數,值以毫秒為單位
read_timeout
(預設值:0
)- 浮點數,值以秒為單位,預設值表示無限
timeout
(預設值:0
)- 連線逾時時間。浮點數,值以秒為單位,預設值表示無限
sentinel_master
(預設值:null
)- 字串,如果為 null 或空字串,則停用 Sentinel 支援
redis_sentinel
(預設值:null
)-
sentinel_master
選項的別名7.1
redis_sentinel
選項是在 Symfony 7.1 中引入的。 ssl
(預設值:null
)-
SSL 上下文選項的對應,用於 TLS 通道。例如,這對於變更測試中 TLS 通道的需求很有用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# config/packages/test/messenger.yaml framework: messenger: transports: redis: dsn: "rediss://127.0.0.1" options: ssl: allow_self_signed: true capture_peer_cert: true capture_peer_cert_chain: true disable_compression: true SNI_enabled: true verify_peer: true verify_peer_name: true
警告
對於相同的 stream
、group
和 consumer
組合,永遠不應有多個 messenger:consume
命令在運行,否則訊息最終可能會被多次處理。如果您運行多個佇列 workers,consumer
可以設定為環境變數,例如 %env(MESSENGER_CONSUMER_NAME)%
,由 Supervisor(如下例)或用於管理 worker 程序的任何其他服務設定。在容器環境中,HOSTNAME
可以用作消費者名稱,因為每個容器/主機只有一個 worker。如果使用 Kubernetes 來協調容器,請考慮使用 StatefulSet
以獲得穩定的名稱。
提示
將 delete_after_ack
設定為 true
(如果您使用單一群組)或定義 stream_max_entries
(如果您可以估計在您的情況下可接受的最大條目數),以避免記憶體洩漏。否則,所有訊息將永遠保留在 Redis 中。
記憶體內傳輸器
in-memory
傳輸方式實際上不會傳遞訊息。相反,它會在請求期間將它們保存在記憶體中,這對於測試很有用。例如,如果您有 async_priority_normal
傳輸方式,您可以在 test
環境中覆寫它以使用此傳輸方式
1 2 3 4 5
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
然後,在測試時,訊息不會傳遞到真實的傳輸方式。更好的是,在測試中,您可以檢查在請求期間是否只發送了一條訊息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething(): void
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/** @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
傳輸方式有多個選項
serialize
(布林值,預設值:false
)- 是否序列化訊息。這對於測試額外層很有用,尤其是在您使用自己的訊息序列化器時。
注意
所有 in-memory
傳輸方式將在每個測試後自動重設,在擴充 KernelTestCase 或 WebTestCase 的測試類別中。
Amazon SQS
Amazon SQS 傳輸方式非常適合託管在 AWS 上的應用程式。透過運行以下命令安裝它
1
$ composer require symfony/amazon-sqs-messenger
SQS 傳輸方式 DSN 可能看起來像這樣
1 2 3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://127.0.0.1:9494/messages?sslmode=disable
注意
傳輸方式將自動建立所需的佇列。可以透過將 auto_setup
選項設定為 false
來停用此功能。
提示
在發送或接收訊息之前,Symfony 需要透過在 AWS 中呼叫 GetQueueUrl
API 將佇列名稱轉換為 AWS 佇列 URL。可以透過提供作為佇列 URL 的 DSN 來避免此額外的 API 呼叫。
傳輸方式有多個選項
access_key
- AWS 存取金鑰(必須進行 URL 編碼)
account
(預設值:憑證的擁有者)- AWS 帳戶的識別碼
auto_setup
(預設值:true
)- 佇列是否應在傳送/接收期間自動建立。
buffer_size
(預設值:9
)- 要預先提取的訊息數量
debug
(預設值:false
)- 若為
true
,則記錄所有 HTTP 請求和回應 (會影響效能) endpoint
(預設值:https://sqs.eu-west-1.amazonaws.com
)- SQS 服務的絕對 URL
poll_timeout
(預設值:0.1
)- 等待新訊息的持續時間 (秒)
queue_name
(預設值:messages
)- 佇列名稱
region
(預設值:eu-west-1
)- AWS 區域的名稱
secret_key
- AWS 密鑰 (必須進行 URL 編碼)
session_token
- AWS 工作階段權杖
visibility_timeout
(預設值:佇列的組態)- 訊息將不可見的秒數 (可見性逾時)
wait_time
(預設值:20
)- 長輪詢持續時間 (秒)
注意
wait_time
參數定義 Amazon SQS 在傳送回應之前,應等待佇列中訊息可用的最長持續時間。這有助於減少使用 Amazon SQS 的成本,方法是消除空回應的數量。
poll_timeout
參數定義接收器應等待多久才傳回 null 的持續時間。這可避免封鎖其他接收器被呼叫。
注意
如果佇列名稱的後綴為 .fifo
,AWS 將建立 FIFO 佇列。使用戳記 AmazonSqsFifoStamp 定義訊息群組 ID
和訊息重複資料刪除 ID
。
另一種可能性是啟用 AddFifoStampMiddleware。如果您的訊息實作 MessageDeduplicationAwareInterface,中介軟體將自動新增 AmazonSqsFifoStamp 並設定訊息重複資料刪除 ID
。此外,如果您的訊息實作 MessageGroupAwareInterface,中介軟體將自動設定戳記的訊息群組 ID
。
您可以在專門章節中了解更多關於中介軟體的資訊。
FIFO 佇列不支援為每個訊息設定延遲,重試策略設定中需要 delay: 0
的值。
7.2
使用 `--keepalive` 選項的 Keepalive 支援已在 Symfony 7.2 中新增。
序列化訊息
當訊息傳送至 (和接收自) 傳輸時,它們會使用 PHP 的原生 serialize()
& unserialize()
函數進行序列化。您可以將此全域 (或針對每個傳輸) 變更為實作 SerializerInterface 的服務
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
serializer:
default_serializer: messenger.transport.symfony_serializer
symfony_serializer:
format: json
context: { }
transports:
async_priority_normal:
dsn: # ...
serializer: messenger.transport.symfony_serializer
messenger.transport.symfony_serializer
是一個內建服務,它使用 Serializer 組件,並且可以用幾種方式進行組態。如果您確實選擇使用 Symfony serializer,您可以透過 SerializerStamp,依個案控制內容 (請參閱信封 & 戳記)。
提示
當將訊息傳送/接收到/從另一個應用程式時,您可能需要更精細地控制序列化程序。使用自訂序列化器可提供該控制。請參閱 SymfonyCasts 的訊息序列化器教學課程 以了解詳細資訊。
執行命令和外部程序
觸發命令
可以透過分派 RunCommandMessage 來觸發任何命令。Symfony 將負責處理此訊息並執行傳遞至訊息參數的命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
// Long task with some caching...
// Once finished, dispatch some clean up commands
$this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
$this->bus->dispatch(new RunCommandMessage('cache:clear'));
}
}
您可以組態在命令執行期間發生錯誤時的行為。若要執行此操作,您可以在建立 RunCommandMessage 實例時,使用 throwOnFailure
和 catchExceptions
參數。
處理完成後,處理常式將傳回 RunCommandContext,其中包含許多有用的資訊,例如結束代碼或程序的輸出。您可以參閱關於處理常式結果的專門頁面以取得更多資訊。
觸發外部程序
Messenger 隨附一個方便的輔助程式,可透過分派訊息來執行外部程序。這利用了 Process 組件。透過分派 RunProcessMessage,Messenger 將負責使用您傳遞的參數建立新的程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Process\Messenger\RunProcessMessage;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
$this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));
// ...
}
}
處理完成後,處理常式將傳回 RunProcessContext,其中包含許多有用的資訊,例如結束代碼或程序的輸出。您可以參閱關於處理常式結果的專門頁面以取得更多資訊。
Ping Web 服務
有時,您可能需要定期 ping Web 服務以取得其狀態,例如,它是啟動還是關閉。可以透過分派 PingWebhookMessage 來執行此操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class LivenessService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function ping(): void
{
// An HttpExceptionInterface is thrown on 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));
// Ping, but does not throw on 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));
// Any valid HttpClientInterface option can be used
$this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
'headers' => [
'Authorization' => 'Bearer ...'
],
'json' => [
'data' => 'some-data',
],
]));
}
}
處理常式將傳回 ResponseInterface,讓您可以收集和處理 HTTP 請求傳回的資訊。
從您的處理器取得結果
當訊息被處理時,HandleMessageMiddleware 會為每個處理訊息的物件新增 HandledStamp。您可以使用它來取得處理常式傳回的值
1 2 3 4 5 6 7 8 9 10 11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
$envelope = $messageBus->dispatch(new SomeMessage());
// get the value that was returned by the last message handler
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();
// or get info about all of handlers
$handledStamps = $envelope->all(HandledStamp::class);
使用命令 & 查詢總線時取得結果
Messenger 組件可用於 CQRS 架構中,其中命令和查詢匯流排是應用程式的核心組件。請閱讀 Martin Fowler 的 關於 CQRS 的文章以了解更多資訊,以及如何組態多個匯流排。
由於查詢通常是同步的且預期只處理一次,因此從處理常式取得結果是常見的需求。
存在 HandleTrait,可在同步處理時取得處理常式的結果。它也確保只註冊一個處理常式。HandleTrait
可用於具有 $messageBus
屬性的任何類別中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
// src/Action/ListItems.php
namespace App\Action;
use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class ListItems
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
public function __invoke(): void
{
$result = $this->query(new ListItemsQuery(/* ... */));
// Do something with the result
// ...
}
// Creating such a method is optional, but allows type-hinting the result
private function query(ListItemsQuery $query): ListItemsQueryResult
{
return $this->handle($query);
}
}
因此,您可以使用 trait 來建立命令和查詢匯流排類別。例如,您可以建立一個特殊的 QueryBus
類別,並在任何需要查詢匯流排行為的地方注入它,而不是 MessageBusInterface
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class QueryBus
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
/**
* @param object|Envelope $query
*
* @return mixed The handler returned value
*/
public function query($query): mixed
{
return $this->handle($query);
}
}
自訂處理器
手動設定處理器
Symfony 通常會自動尋找並註冊您的處理常式。但是,您也可以在使用 #AsMessageHandler
屬性或使用 messenger.message_handler
標籤標記處理常式服務時,手動組態處理常式,並傳遞一些額外的組態。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message): void
{
// ...
}
}
使用標籤組態的可能選項為
bus
- 處理常式可以從中接收訊息的匯流排名稱,預設為所有匯流排。
from_transport
- 處理常式可以從中接收訊息的傳輸名稱,預設為所有傳輸。
handles
- 可以由處理常式處理的訊息類型 (FQCN),只有在無法透過類型提示猜測時才需要。
method
- 將處理訊息的方法名稱。
priority
- 當多個處理常式可以處理相同的訊息時,處理常式的優先順序。
處理多個訊息
單一處理常式類別可以處理多個訊息。為此,請將 #AsMessageHandler
屬性新增至所有處理方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
class SmsNotificationHandler
{
#[AsMessageHandler]
public function handleSmsNotification(SmsNotification $message): void
{
// ...
}
#[AsMessageHandler]
public function handleOtherSmsNotification(OtherSmsNotification $message): void
{
// ...
}
}
事務訊息:在處理完成後處理新訊息
訊息處理常式可以在處理其他訊息時分派
新訊息,到相同或不同的匯流排 (如果應用程式有多個匯流排)。在此過程中發生的任何錯誤或例外狀況都可能產生意想不到的後果,例如
- 如果使用
DoctrineTransactionMiddleware
且分派的訊息擲回例外狀況,則原始處理常式中的任何資料庫交易都將被回滾。 - 如果訊息被分派到不同的匯流排,則即使目前處理常式稍後的某些程式碼擲回例外狀況,分派的訊息仍將被處理。
範例 RegisterUser
程序
考慮一個同時具有命令和事件匯流排的應用程式。應用程式將名為 RegisterUser
的命令分派到命令匯流排。RegisterUserHandler
處理常式會處理該命令,該處理常式會建立 User
物件、將該物件儲存到資料庫,並將 UserRegistered
訊息分派到事件匯流排。
有許多 UserRegistered
訊息的處理常式,其中一個處理常式可能會向新使用者傳送歡迎電子郵件。我們正在使用 DoctrineTransactionMiddleware
將所有資料庫查詢包裝在一個資料庫交易中。
問題 1:如果在傳送歡迎電子郵件時擲回例外狀況,則使用者將不會被建立,因為 DoctrineTransactionMiddleware
將回滾 Doctrine 交易,使用者已在其中建立。
問題 2:如果在將使用者儲存到資料庫時擲回例外狀況,則歡迎電子郵件仍會傳送,因為它是非同步處理的。
DispatchAfterCurrentBusMiddleware 中介軟體
對於許多應用程式而言,所需的行為是僅在處理常式完全完成後,才處理由處理常式分派的訊息。這可以使用 DispatchAfterCurrentBusMiddleware
並將 DispatchAfterCurrentBusStamp
戳記新增至 訊息信封 來完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;
use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
class RegisterUserHandler
{
public function __construct(
private MessageBusInterface $eventBus,
private EntityManagerInterface $em,
) {
}
public function __invoke(RegisterUser $command): void
{
$user = new User($command->getUuid(), $command->getName(), $command->getEmail());
$this->em->persist($user);
// The DispatchAfterCurrentBusStamp marks the event message to be handled
// only if this handler does not throw an exception.
$event = new UserRegistered($command->getUuid());
$this->eventBus->dispatch(
(new Envelope($event))
->with(new DispatchAfterCurrentBusStamp())
);
// ...
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;
use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;
class WhenUserRegisteredThenSendWelcomeEmail
{
public function __construct(
private MailerInterface $mailer,
EntityManagerInterface $em,
) {
}
public function __invoke(UserRegistered $event): void
{
$user = $this->em->getRepository(User::class)->find($event->getUuid());
$this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
}
}
這表示 UserRegistered
訊息將在 RegisterUserHandler
完成且新的 User
持續保存到資料庫之後才會被處理。如果 RegisterUserHandler
遇到例外狀況,則 UserRegistered
事件將永遠不會被處理。如果在傳送歡迎電子郵件時擲回例外狀況,則 Doctrine 交易將不會被回滾。
注意
如果 WhenUserRegisteredThenSendWelcomeEmail
擲回例外狀況,則該例外狀況將被包裝在 DelayedMessageHandlingException
中。使用 DelayedMessageHandlingException::getWrappedExceptions
將為您提供在使用 DispatchAfterCurrentBusStamp
處理訊息時擲回的所有例外狀況。
dispatch_after_current_bus
中介軟體預設為啟用。如果您要手動組態中介軟體,請務必在 middleware 鏈中於 doctrine_transaction
之前註冊 dispatch_after_current_bus
。此外,所有正在使用的匯流排都必須載入 dispatch_after_current_bus
中介軟體。
將處理器綁定到不同的傳輸器
每個訊息可以有多個處理常式,並且當訊息被取用時,會呼叫其所有處理常式。但是,您也可以組態處理常式僅在從特定傳輸接收到訊息時才被呼叫。這允許您擁有單一訊息,其中每個處理常式都由取用不同傳輸的不同「工作者」呼叫。
假設您有一個具有兩個處理常式的 UploadedImage
訊息
ThumbnailUploadedImageHandler
:您希望由名為image_transport
的傳輸來處理NotifyAboutNewUploadedImageHandler
:您希望由名為async_priority_normal
的傳輸來處理
若要執行此操作,請將 from_transport
選項新增至每個處理常式。例如
1 2 3 4 5 6 7 8 9 10 11 12 13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
public function __invoke(UploadedImage $uploadedImage): void
{
// do some thumbnailing
}
}
並且類似地
1 2 3 4 5 6 7 8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
// ...
}
然後,請確保將您的訊息「路由」到兩個傳輸
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: # ...
image_transport: # ...
routing:
# ...
'App\Message\UploadedImage': [image_transport, async_priority_normal]
就是這樣!您現在可以取用每個傳輸
1 2 3 4
# will only call ThumbnailUploadedImageHandler when handling the message
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
警告
如果處理常式沒有 from_transport
組態,則它將在訊息接收自的每個傳輸上執行。
批次處理訊息
您可以宣告「特殊」處理常式,這些處理常式將分批處理訊息。透過這樣做,處理常式將等待一定數量的訊息處於擱置狀態,然後再處理它們。批次處理常式的宣告是透過實作 BatchHandlerInterface 來完成的。BatchHandlerTrait 也提供用於簡化這些特殊處理常式的宣告
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
class MyBatchHandler implements BatchHandlerInterface
{
use BatchHandlerTrait;
public function __invoke(MyMessage $message, ?Acknowledger $ack = null): mixed
{
return $this->handle($message, $ack);
}
private function process(array $jobs): void
{
foreach ($jobs as [$message, $ack]) {
try {
// Compute $result from $message...
// Acknowledge the processing of the message
$ack->ack($result);
} catch (\Throwable $e) {
$ack->nack($e);
}
}
}
// Optionally, you can override some of the trait methods, such as the
// `getBatchSize()` method, to specify your own batch size...
private function getBatchSize(): int
{
return 100;
}
}
注意
當 __invoke()
的 $ack
引數為 null
時,預期訊息會同步處理。否則,預期 __invoke()
會傳回擱置訊息的數量。BatchHandlerTrait 會為您處理此問題。
注意
預設情況下,擱置的批次會在工作者閒置以及停止時刷新。
擴展 Messenger
信封 & 戳記
訊息可以是任何 PHP 物件。有時,您可能需要組態關於訊息的一些額外資訊,例如它在 AMQP 中的處理方式,或在訊息應被處理之前新增延遲。您可以透過將「戳記」新增至您的訊息來執行此操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus): void
{
// wait 5 seconds before processing
$bus->dispatch(new SmsNotification('...'), [
new DelayStamp(5000),
]);
// or explicitly create an Envelope
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
在內部,每個訊息都包裝在 Envelope
中,它包含訊息和戳記。您可以手動建立此信封,或允許訊息匯流排執行此操作。有多種不同的戳記用於不同的目的,它們在內部用於追蹤關於訊息的資訊,例如處理它的訊息匯流排,或它是否在失敗後正在重試。
中介層
當您將訊息分派到訊息匯流排時會發生什麼情況,取決於其中介軟體的集合及其順序。預設情況下,為每個匯流排組態的中介軟體如下所示
add_bus_name_stamp_middleware
- 新增戳記以記錄此訊息分派到哪個匯流排;dispatch_after_current_bus
- 請參閱Messenger:同步 & 佇列訊息處理;failed_message_processing_middleware
- 處理透過失敗傳輸重試的訊息,使其正常運作,就像它們是從原始傳輸接收到的一樣;- 您自己的中介軟體集合;
send_message
- 如果為傳輸組態了路由,則此選項會將訊息傳送至該傳輸並停止中介軟體鏈;handle_message
- 呼叫給定訊息的訊息處理常式。
注意
這些中介軟體名稱實際上是捷徑名稱。真正的服務 ID 以 messenger.middleware.
為字首 (例如 messenger.middleware.handle_message
)。
中介軟體在訊息分派時執行,但也在透過工作者接收訊息時再次執行 (對於傳送至傳輸以進行非同步處理的訊息)。如果您建立自己的中介軟體,請記住這一點。
您可以將自己的中介軟體新增至此清單,或完全停用預設中介軟體,並僅包含您自己的中介軟體。
如果中介軟體服務是抽象的,您可以組態其建構子的引數,並且每個匯流排都會建立不同的實例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# config/packages/messenger.yaml
framework:
messenger:
buses:
messenger.bus.default:
# disable the default middleware
default_middleware: false
middleware:
# use and configure parts of the default middleware you want
- 'add_bus_name_stamp_middleware': ['messenger.bus.default']
# add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
- 'App\Middleware\MyMiddleware'
- 'App\Middleware\AnotherMiddleware'
提示
如果您已安裝 MakerBundle,則可以使用 make:messenger-middleware
命令來引導建立您自己的 messenger 中介軟體。
Doctrine 的中介層
如果您在應用程式中使用 Doctrine,則存在許多您可能想要使用的選用中介軟體
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# each time a message is handled, the Doctrine connection
# is "pinged" and reconnected if it's closed. Useful
# if your workers run for a long time and the database
# connection is sometimes lost
- doctrine_ping_connection
# After handling, the Doctrine connection is closed,
# which can free up database connections in a worker,
# instead of keeping them open forever
- doctrine_close_connection
# logs an error when a Doctrine transaction was opened but not closed
- doctrine_open_transaction_logger
# wraps all handlers in a single Doctrine transaction
# handlers do not need to call flush() and an error
# in any handler will cause a rollback
- doctrine_transaction
# or pass a different entity manager to any
#- doctrine_transaction: ['custom']
其他中介層
如果您需要在消費者中產生絕對 URL (例如,使用連結呈現範本),請新增 router_context
中介軟體。此中介軟體會儲存建立絕對 URL 時所需的原始請求內容 (即主機、HTTP 連接埠等)。
如果您需要在處理訊息物件之前,使用 Validator 組件驗證訊息物件,請新增 validation
中介軟體。如果驗證失敗,將擲回 ValidationFailedException
。ValidationStamp 可用於組態驗證群組。
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
- router_context
- validation
Messenger 事件
除了中介軟體之外,Messenger 還會分派數個事件。您可以建立事件接聽器以掛鉤到程序的各個部分。對於每個事件,事件類別都是事件名稱
額外處理器引數
可以使用 HandlerArgumentsStamp,讓 messenger 將額外資料傳遞至訊息處理常式。將此戳記新增至中介軟體中的信封,並填入您希望在處理常式中可用的任何額外資料
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new HandlerArgumentsStamp([
$this->resolveAdditionalArgument($envelope->getMessage()),
]));
return $stack->next()->handle($envelope, $stack);
}
private function resolveAdditionalArgument(object $message): mixed
{
// ...
}
}
然後您的處理常式將如下所示
1 2 3 4 5 6 7 8 9 10 11 12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
final class SmsNotificationHandler
{
public function __invoke(SmsNotification $message, mixed $additionalArgument)
{
// ...
}
}
自訂資料格式的訊息序列化器
如果您從其他應用程式接收訊息,則它們的格式可能與您需要的格式不完全相同。並非所有應用程式都會傳回具有 body
和 headers
欄位的 JSON 訊息。在這些情況下,您需要建立新的訊息序列化器,以實作 SerializerInterface。假設您想要建立訊息解碼器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
namespace App\Messenger\Serializer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class MessageWithTokenDecoder implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
try {
// parse the data you received with your custom fields
$data = $encodedEnvelope['data'];
$data['token'] = $encodedEnvelope['token'];
// other operations like getting information from stamps
} catch (\Throwable $throwable) {
// wrap any exception that may occur in the envelope to send it to the failure transport
return new Envelope($throwable);
}
return new Envelope($data);
}
public function encode(Envelope $envelope): array
{
// this decoder does not encode messages, but you can implement it by returning
// an array with serialized stamps if you need to send messages in a custom format
throw new \LogicException('This serializer is only used for decoding messages.');
}
}
下一步是告訴 Symfony 在您的一個或多個傳輸中使用此序列化器
1 2 3 4 5 6 7
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: '%env(MY_TRANSPORT_DSN)%'
serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'
多個總線、命令 & 事件總線
Messenger 預設為您提供單一訊息匯流排服務。但是,您可以組態任意數量的訊息匯流排,建立「命令」、「查詢」或「事件」匯流排,並控制它們的中介軟體。
建置應用程式時的常見架構是將命令與查詢分開。命令是執行某些操作的動作,而查詢則擷取資料。這稱為 CQRS (命令查詢職責分離)。請參閱 Martin Fowler 的 關於 CQRS 的文章以了解更多資訊。此架構可以與 Messenger 組件結合使用,方法是定義多個匯流排。
命令匯流排與查詢匯流排略有不同。例如,命令匯流排通常不提供任何結果,而查詢匯流排很少是非同步的。您可以使用中介軟體組態這些匯流排及其規則。
透過引入事件匯流排來將動作與反應分開也可能是個好主意。事件匯流排可以有零個或多個訂閱者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
framework:
messenger:
# The bus that is going to be injected when injecting MessageBusInterface
default_bus: command.bus
buses:
command.bus:
middleware:
- validation
- doctrine_transaction
query.bus:
middleware:
- validation
event.bus:
default_middleware:
enabled: true
# set "allow_no_handlers" to true (default is false) to allow having
# no handler configured for this bus without throwing an exception
allow_no_handlers: false
# set "allow_no_senders" to false (default is true) to throw an exception
# if no sender is configured for this bus
allow_no_senders: true
middleware:
- validation
這將建立三個新服務
command.bus
:可使用 MessageBusInterface 類型提示自動連線 (因為這是default_bus
);query.bus
:可使用MessageBusInterface $queryBus
自動連線;event.bus
:可使用MessageBusInterface $eventBus
自動連線。
限制每個總線的處理器
預設情況下,每個處理常式都可以在所有匯流排上處理訊息。若要防止在沒有錯誤的情況下將訊息分派到錯誤的匯流排,您可以使用 messenger.message_handler
標籤將每個處理常式限制為特定的匯流排
1 2 3 4
# config/services.yaml
services:
App\MessageHandler\SomeCommandHandler:
tags: [{ name: messenger.message_handler, bus: command.bus }]
透過這種方式,App\MessageHandler\SomeCommandHandler
處理常式將僅為 command.bus
匯流排所知。
您也可以使用 _instanceof 服務組態,自動將此標籤新增至許多類別。使用此組態,您可以根據實作的介面判斷訊息匯流排
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# config/services.yaml
services:
# ...
_instanceof:
# all services implementing the CommandHandlerInterface
# will be registered on the command.bus bus
App\MessageHandler\CommandHandlerInterface:
tags:
- { name: messenger.message_handler, bus: command.bus }
# while those implementing QueryHandlerInterface will be
# registered on the query.bus bus
App\MessageHandler\QueryHandlerInterface:
tags:
- { name: messenger.message_handler, bus: query.bus }
偵錯總線
debug:messenger
命令會列出每個匯流排可用的訊息和處理常式。您也可以透過提供匯流排名稱作為引數,將清單限制為特定的匯流排。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
$ php bin/console debug:messenger
Messenger
=========
command.bus
-----------
The following messages can be dispatched:
---------------------------------------------------------------------------------------
App\Message\DummyCommand
handled by App\MessageHandler\DummyCommandHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
query.bus
---------
The following messages can be dispatched:
---------------------------------------------------------------------------------------
App\Message\DummyQuery
handled by App\MessageHandler\DummyQueryHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
提示
該命令還將顯示訊息和處理常式類別的 PHPDoc 描述。
重新分派訊息
如果您想要重新分派訊息 (使用相同的傳輸和信封),請建立新的 RedispatchMessage 並透過您的匯流排分派它。重新使用先前顯示的相同 SmsNotification
範例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __construct(private MessageBusInterface $bus)
{
}
public function __invoke(SmsNotification $message): void
{
// do something with the message
// then redispatch it based on your own logic
if ($needsRedispatch) {
$this->bus->dispatch(new RedispatchMessage($message));
}
}
}
內建的 RedispatchMessageHandler 將負責處理此訊息,以透過最初分派它的相同匯流排重新分派它。您也可以使用 RedispatchMessage
建構子的第二個引數來提供在重新分派訊息時要使用的傳輸。