跳到內容

Messenger:同步 & 排隊訊息處理

編輯此頁面

Messenger 提供訊息總線,能夠傳送訊息,然後在您的應用程式中立即處理,或透過傳輸器 (例如佇列) 傳送訊息,以便稍後處理。若要深入瞭解,請閱讀 Messenger 组件文件

安裝

在使用 Symfony Flex 的應用程式中,執行此命令以安裝 messenger

1
$ composer require symfony/messenger

建立訊息 & 處理器

Messenger 圍繞著您將建立的兩個不同類別:(1) 訊息類別,用於保存資料,以及 (2) 處理器類別,該類別將在分派該訊息時被呼叫。處理器類別將讀取訊息類別並執行一或多個任務。

訊息類別沒有特定的要求,除了它可以被序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/SmsNotification.php
namespace App\Message;

class SmsNotification
{
    public function __construct(
        private string $content,
    ) {
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

訊息處理器是一個 PHP 可呼叫物件,建議的建立方式是建立一個具有 AsMessageHandler 屬性,並且具有以訊息類別 (或訊息介面) 類型提示的 __invoke() 方法的類別

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message)
    {
        // ... do some work - like sending an SMS message!
    }
}

提示

您也可以在個別類別方法上使用 #[AsMessageHandler] 屬性。您可以在單個類別中根據需要對任意多個方法使用此屬性,讓您可以將多種類型相關訊息的處理分組。

感謝 自動配置SmsNotification 類型提示,Symfony 知道在分派 SmsNotification 訊息時應呼叫此處理器。大多數情況下,這就是您需要做的全部工作。但是您也可以手動設定訊息處理器。若要查看所有已設定的處理器,請執行

1
$ php bin/console debug:messenger

分派訊息

您準備好了!若要分派訊息 (並呼叫處理器),請注入 messenger.default_bus 服務 (透過 MessageBusInterface),就像在控制器中一樣

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/Controller/DefaultController.php
namespace App\Controller;

use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus): Response
    {
        // will cause the SmsNotificationHandler to be called
        $bus->dispatch(new SmsNotification('Look! I created a message!'));

        // ...
    }
}

傳輸器:非同步/排隊訊息

預設情況下,訊息會在分派後立即處理。如果您想要非同步處理訊息,您可以設定傳輸器。傳輸器能夠傳送訊息 (例如到佇列系統),然後透過 Worker 接收它們。Messenger 支援多個傳輸器

注意

如果您想要使用不受支援的傳輸器,請查看 Enqueue 的傳輸器,它支援 Kafka 和 Google Pub/Sub 等服務。

傳輸器使用 "DSN" 註冊。感謝 Messenger 的 Flex recipe,您的 .env 檔案已經有一些範例。

1
2
3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages

取消註解您想要的任何傳輸器 (或在 .env.local 中設定)。請參閱 Messenger:同步 & 排隊訊息處理 以取得更多詳細資訊。

接下來,在 config/packages/messenger.yaml 中,讓我們定義一個名為 async 的傳輸器,它使用此設定

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: "%env(MESSENGER_TRANSPORT_DSN)%"

            # or expanded to configure more options
            #async:
            #    dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
            #    options: []

將訊息路由到傳輸器

現在您已經設定了傳輸器,您可以將訊息設定為傳送到傳輸器,而不是立即處理訊息

1
2
3
4
5
6
7
8
9
10
// src/Message/SmsNotification.php
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage('async')]
class SmsNotification
{
    // ...
}

7.2

#[AsMessage] 屬性在 Symfony 7.2 中引入。

如此一來,App\Message\SmsNotification 將被傳送到 async 傳輸器,並且它的處理器將不會立即被呼叫。在 routing 下未匹配的任何訊息仍將立即處理,即同步處理。

注意

如果您同時使用 YAML/XML/PHP 設定檔和 PHP 屬性設定路由,則設定始終優先於類別屬性。此行為允許您在每個環境的基礎上覆寫路由。

注意

在單獨的 YAML/XML/PHP 檔案中設定路由時,您可以使用部分 PHP 命名空間,例如 'App\Message\*' 來匹配匹配命名空間中的所有訊息。唯一的要求是 '*' 通配符必須放在命名空間的末尾。

您可以將 '*' 用作訊息類別。這將充當任何在 routing 下未匹配的訊息的預設路由規則。這對於確保預設情況下沒有訊息被同步處理非常有用。

唯一的缺點是 '*' 也將應用於使用 Symfony Mailer 發送的電子郵件 (當 Messenger 可用時,它使用 SendEmailMessage)。如果您的電子郵件不可序列化 (例如,如果它們包含作為 PHP 資源/流的文件附件),這可能會導致問題。

您也可以按其父類別或介面路由類別。或者將訊息傳送到多個傳輸器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/Message/SmsNotification.php
namespace App\Message;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(['async', 'audit'])]
class SmsNotification
{
    // ...
}

// if you prefer, you can also apply multiple attributes to the message class
#[AsMessage('async')]
#[AsMessage('audit')]
class SmsNotification
{
    // ...
}

注意

如果您為子類別和父類別都設定了路由,則兩個規則都會被使用。例如,如果您有一個從 Notification 擴展而來的 SmsNotification 物件,則 NotificationSmsNotification 的路由都將被使用。

提示

您可以使用訊息信封上的 TransportNamesStamp 在運行時定義和覆寫訊息正在使用的傳輸器。此戳記將傳輸器名稱陣列作為其唯一引數。有關戳記的更多資訊,請參閱 信封 & 戳記

訊息中的 Doctrine 實體

如果您需要在訊息中傳遞 Doctrine 實體,最好傳遞實體的主鍵 (或處理器實際需要的任何相關資訊,例如 email 等) 而不是物件 (否則您可能會看到與實體管理器相關的錯誤)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;

class NewUserWelcomeEmail
{
    public function __construct(
        private int $userId,
    ) {
    }

    public function getUserId(): int
    {
        return $this->userId;
    }
}

然後,在您的處理器中,您可以查詢新的物件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;

use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
    public function __construct(
        private UserRepository $userRepository,
    ) {
    }

    public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
    {
        $user = $this->userRepository->find($welcomeEmail->getUserId());

        // ... send an email!
    }
}

這保證了實體包含最新的資料。

同步處理訊息

如果訊息不符合任何路由規則,則它不會被傳送到任何傳輸器,並且將立即處理。在某些情況下 (例如當將處理器綁定到不同的傳輸器時),顯式處理此問題更容易或更靈活:透過建立 sync 傳輸器並在那裡 "傳送" 訊息以立即處理

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            # ... other transports

            sync: 'sync://'

        routing:
            App\Message\SmsNotification: sync

建立您自己的傳輸器

如果您需要從不受支援的物件傳送或接收訊息,您也可以建立自己的傳輸器。請參閱 如何建立您自己的 Messenger 傳輸器

消費訊息 (執行 Worker)

一旦您的訊息被路由,在大多數情況下,您需要 "消費" 它們。您可以使用 messenger:consume 命令來執行此操作

1
2
3
4
$ php bin/console messenger:consume async

# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv

第一個引數是接收器的名稱 (或服務 ID,如果您路由到自訂服務)。預設情況下,命令將永遠運行:在您的傳輸器上尋找新訊息並處理它們。此命令稱為您的 "worker"。

如果您想要消費來自所有可用接收器的訊息,您可以使用帶有 --all 選項的命令

1
$ php bin/console messenger:consume --all

7.1

--all 選項在 Symfony 7.1 中引入。

--keepalive 選項可用於防止訊息在長時間運行的處理過程中過早地重新傳遞。它將訊息標記為 "處理中",並防止在 Worker 完成處理之前重新傳遞它。

注意

此選項僅適用於受支援的傳輸器,即 Beanstalkd 和 AmazonSQS 傳輸器。

7.2

--keepalive 選項在 Symfony 7.2 中引入。

提示

在開發環境中,如果您正在使用 Symfony CLI 工具,您可以設定 Worker 與 Web 伺服器一起自動運行。您可以在 Symfony CLI Workers 文件中找到更多資訊。

提示

若要正確停止 Worker,請拋出 StopWorkerException 的實例。

部署到生產環境

在生產環境中,有一些重要的事項需要考慮

使用 Process Manager (例如 Supervisor 或 systemd) 來保持您的 Worker 運行
您需要一個或多個 "worker" 始終運行。若要執行此操作,請使用程序控制系統,例如 Supervisorsystemd
不要讓 Worker 永遠運行
某些服務 (例如 Doctrine 的 EntityManager) 會隨著時間的推移消耗更多記憶體。因此,不要讓您的 Worker 永遠運行,而是使用諸如 messenger:consume --limit=10 之類的標誌來告訴您的 Worker 僅處理 10 條訊息後退出 (然後程序管理器將建立一個新程序)。還有其他選項,例如 --memory-limit=128M--time-limit=3600
停止遇到錯誤的 Worker
如果 Worker 依賴項 (例如您的資料庫伺服器) 關閉,或達到逾時,您可以嘗試新增重新連線邏輯,或者如果 Worker 收到太多錯誤,可以使用 messenger:consume 命令的 --failure-limit 選項來退出 Worker。
在部署時重新啟動 Worker
每次部署時,您都需要重新啟動所有 Worker 程序,以便它們看到新部署的程式碼。若要執行此操作,請在部署時運行 messenger:stop-workers。這將向每個 Worker 發出信號,指示它應該完成當前正在處理的訊息並應優雅地關閉。然後,程序管理器將建立新的 Worker 程序。該命令在內部使用 app 快取 - 因此請確保將其設定為使用您喜歡的适配器。
在部署之間使用相同的快取
如果您的部署策略涉及建立新的目標目錄,您應該為 cache.prefix_seed 設定選項設定一個值,以便在部署之間使用相同的快取命名空間。否則,cache.app 池將使用 kernel.project_dir 參數的值作為命名空間的基礎,這將導致每次進行新部署時使用不同的命名空間。

優先順序傳輸器

有時,某些類型的訊息應該具有更高的優先順序,並在其他訊息之前處理。為了使這成為可能,您可以建立多個傳輸器並將不同的訊息路由到它們。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    # queue_name is specific to the doctrine transport
                    queue_name: high

                    # for AMQP send to a separate exchange then queue
                    #exchange:
                    #    name: high
                    #queues:
                    #    messages_high: ~
                    # for redis try "group"
            async_priority_low:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    queue_name: low

        routing:
            'App\Message\SmsNotification': async_priority_low
            'App\Message\NewUserWelcomeEmail': async_priority_high

然後,您可以為每個傳輸器運行單獨的 Worker,或指示一個 Worker 按優先順序處理訊息

1
$ php bin/console messenger:consume async_priority_high async_priority_low

Worker 將始終首先尋找在 async_priority_high 上等待的訊息。如果沒有,然後它將消費來自 async_priority_low 的訊息。

限制消費特定佇列

某些傳輸器 (尤其是 AMQP) 具有交換器和佇列的概念。Symfony 傳輸器始終綁定到交換器。預設情況下,Worker 從附加到指定傳輸器的交換器的所有佇列中消費。但是,在某些用例中,您可能希望 Worker 僅從特定佇列中消費。

您可以限制 Worker 僅處理來自特定佇列的訊息

1
2
3
4
$ php bin/console messenger:consume my_transport --queues=fasttrack

# you can pass the --queues option more than once to process multiple queues
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2

注意

若要允許使用 queues 選項,接收器必須實現 QueueReceiverInterface

檢查每個傳輸器的排隊訊息數量

運行 messenger:stats 命令以了解某些或所有傳輸器的 "佇列" 中有多少訊息

1
2
3
4
5
6
7
8
9
# displays the number of queued messages in all transports
$ php bin/console messenger:stats

# shows stats only for some transports
$ php bin/console messenger:stats my_transport_name other_transport_name

# you can also output the stats in JSON format
$ php bin/console messenger:stats --format=json
$ php bin/console messenger:stats my_transport_name other_transport_name --format=json

7.2

format 選項在 Symfony 7.2 中引入。

注意

為了使此命令起作用,設定的傳輸器的接收器必須實現 MessageCountAwareInterface

Supervisor 設定

Supervisor 是一個很棒的工具,可以保證您的 Worker 程序始終運行 (即使它由於故障、達到訊息限制或感謝 messenger:stop-workers 而關閉)。例如,您可以透過以下方式在 Ubuntu 上安裝它

1
$ sudo apt-get install supervisor

Supervisor 設定檔通常位於 /etc/supervisor/conf.d 目錄中。例如,您可以在那裡建立一個新的 messenger-worker.conf 檔案,以確保 messenger:consume 的 2 個實例始終在運行

1
2
3
4
5
6
7
8
9
10
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d

變更 async 引數以使用您的傳輸方式(或多種傳輸方式)的名稱,並將 user 變更為您伺服器上的 Unix 使用者。

警告

在部署期間,某些東西可能無法使用(例如,資料庫),導致消費者啟動失敗。在這種情況下,Supervisor 將嘗試 startretries 次數來重新啟動命令。請務必變更此設定,以避免命令進入 FATAL 狀態,該狀態將永遠不會再次重新啟動。

每次重新啟動,Supervisor 都會將延遲時間增加 1 秒。例如,如果值為 10,它將等待 1 秒、2 秒、3 秒等。這給予服務總共 55 秒的時間再次可用。增加 startretries 設定以涵蓋最大預期停機時間。

如果您使用 Redis Transport,請注意每個 worker 都需要一個唯一的消費者名稱,以避免同一個訊息被多個 worker 處理。實現此目的的一種方法是在 Supervisor 設定檔中設定環境變數,然後您可以在 messenger.yaml 中參考該變數(請參閱下方的 Redis 章節

1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d

接下來,告訴 Supervisor 讀取您的設定並啟動您的 workers

1
2
3
4
5
6
7
8
9
$ sudo supervisorctl reread

$ sudo supervisorctl update

$ sudo supervisorctl start messenger-consume:*

# If you deploy an update of your code, don't forget to restart your workers
# to run the new code
$ sudo supervisorctl restart messenger-consume:*

請參閱 Supervisor 文件 以取得更多詳細資訊。

優雅關閉

如果您在專案中安裝了 PCNTL PHP 擴充功能,workers 將處理 SIGTERMSIGINT POSIX 訊號,以在終止前完成處理當前訊息。

但是,您可能更喜歡使用不同的 POSIX 訊號來進行優雅關閉。您可以透過設定 framework.messenger.stop_worker_on_signals 設定選項來覆寫預設訊號。

在某些情況下,SIGTERM 訊號是由 Supervisor 本身發送的(例如,停止將 Supervisor 作為其 entrypoint 的 Docker 容器)。在這些情況下,您需要在程式設定中新增 stopwaitsecs 鍵(值為所需的寬限期秒數),以便執行優雅關閉

1
2
[program:x]
stopwaitsecs=20

Systemd 設定

雖然 Supervisor 是一個很棒的工具,但它的缺點是您需要系統存取權限才能執行它。Systemd 已成為大多數 Linux 發行版的標準,並且有一個稱為使用者服務的良好替代方案。

Systemd 使用者服務設定檔通常位於 ~/.config/systemd/user 目錄中。例如,您可以建立一個新的 messenger-worker.service 檔案。或者,如果您想要同時運行更多實例,則可以建立 messenger-worker@.service 檔案

1
2
3
4
5
6
7
8
9
10
11
12
[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
# for Redis, set a custom consumer name for each instance
Environment="MESSENGER_CONSUMER_NAME=symfony-%n-%i"
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

現在,告訴 systemd 啟用並啟動一個 worker

1
2
3
4
5
6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service

# to enable and start 20 workers
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service

如果您變更了服務設定檔,則需要重新載入 daemon

1
$ systemctl --user daemon-reload

要重新啟動所有消費者

1
$ systemctl --user restart messenger-consume@*.service

systemd 使用者實例僅在特定使用者的第一次登入後啟動。消費者通常需要在系統啟動時啟動。啟用使用者的 lingering 功能以啟動該行為

1
$ loginctl enable-linger <your-username>

日誌由 journald 管理,可以使用 journalctl 命令進行處理

1
2
3
4
5
6
7
8
# follow logs of consumer nr 11
$ journalctl -f --user-unit messenger-consume@11.service

# follow logs of all consumers
$ journalctl -f --user-unit messenger-consume@*

# follow all logs from your user services
$ journalctl -f _UID=$UID

請參閱 systemd 文件 以取得更多詳細資訊。

注意

您需要 journalctl 命令的提升權限,或將您的使用者新增到 systemd-journal 群組

1
$ sudo usermod -a -G systemd-journal <your-username>

無狀態 Worker

PHP 的設計是無狀態的,不同請求之間沒有共享資源。在 HTTP 環境中,PHP 在發送回應後會清除所有內容,因此您可以決定不處理可能洩漏記憶體的服務。

另一方面,workers 通常在長時間運行的 CLI 程序中依序處理訊息,這些程序不會在處理單一訊息後結束。請注意服務狀態,以防止資訊和/或記憶體洩漏,因為 Symfony 會將同一個服務實例注入到所有訊息中,從而保留服務的內部狀態。

但是,某些 Symfony 服務(例如 Monolog fingers crossed handler)在設計上會洩漏。Symfony 提供了一個服務重設功能來解決此問題。當在兩個訊息之間自動重設容器時,Symfony 會尋找任何實作 ResetInterface 的服務(包括您自己的服務),並呼叫它們的 reset() 方法,以便它們可以清除其內部狀態。

如果服務不是無狀態的,並且您希望在每個訊息之後重設其屬性,則該服務必須實作 ResetInterface,您可以在 reset() 方法中重設屬性。

如果您不想重設容器,請在運行 messenger:consume 命令時新增 --no-reset 選項。

速率限制傳輸器

有時您可能需要限制訊息 worker 的速率。您可以在傳輸方式上設定速率限制器(需要 RateLimiter component),方法是設定其 rate_limiter 選項

1
2
3
4
5
6
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async:
                rate_limiter: your_rate_limiter_name

警告

當在傳輸方式上設定速率限制器時,當達到限制時,它將阻止整個 worker。您應確保為受速率限制的傳輸方式設定專用的 worker,以避免其他傳輸方式被阻止。

重試 & 失敗

如果在從傳輸方式消費訊息時拋出例外,它將自動重新發送到傳輸方式以再次嘗試。預設情況下,訊息將重試 3 次,然後再被丟棄或 傳送到失敗傳輸方式。每次重試也會延遲,以防故障是由於暫時性問題造成的。所有這些都可以在每個傳輸方式上設定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'

                # default configuration
                retry_strategy:
                    max_retries: 3
                    # milliseconds delay
                    delay: 1000
                    # causes the delay to be higher before each retry
                    # e.g. 1 second delay, 2 seconds, 4 seconds
                    multiplier: 2
                    max_delay: 0
                    # applies randomness to the delay that can prevent the thundering herd effect
                    # the value (between 0 and 1.0) is the percentage of 'delay' that will be added/subtracted
                    jitter: 0.1
                    # override all of this with a service that
                    # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
                    # service: null

7.1

jitter 選項是在 Symfony 7.1 中引入的。

提示

當訊息被重試時,Symfony 會觸發 WorkerMessageRetriedEvent,以便您可以運行自己的邏輯。

注意

感謝 SerializedMessageStamp,訊息的序列化形式會被保存,這可以防止在稍後重試訊息時再次序列化它。

避免重試

有時,處理訊息可能會以您知道是永久性的方式失敗,不應重試。如果您拋出 UnrecoverableMessageHandlingException,則不會重試該訊息。

注意

不會重試的訊息仍會顯示在已設定的失敗傳輸方式中。如果您想避免這種情況,請考慮自行處理錯誤並讓處理常式成功結束。

強制重試

有時,處理訊息必須以您知道是暫時性的方式失敗,並且必須重試。如果您拋出 RecoverableMessageHandlingException,則訊息將永遠重試,並且 max_retries 設定將被忽略。

您可以透過在 RecoverableMessageHandlingException 的建構子中設定 retryDelay 引數來定義自訂重試延遲時間(例如,使用 HTTP 回應中的 Retry-After 標頭中的值)。

7.2

retryDelay 引數和 getRetryDelay() 方法是在 Symfony 7.2 中引入的。

儲存 & 重試失敗訊息

如果訊息失敗,它會被重試多次 (max_retries),然後將被丟棄。為了避免這種情況發生,您可以改為設定 failure_transport

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        # after retrying, messages will be sent to the "failed" transport
        failure_transport: failed

        transports:
            # ... other transports

            failed: 'doctrine://default?queue_name=failed'

在此範例中,如果處理訊息失敗 3 次(預設 max_retries),則會將其發送到 failed 傳輸方式。雖然您可以使用 messenger:consume failed 來像正常傳輸方式一樣消費它,但您通常會想要手動檢視失敗傳輸方式中的訊息,並選擇重試它們

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# see all messages in the failure transport with a default limit of 50
$ php bin/console messenger:failed:show

# see the 10 first messages
$ php bin/console messenger:failed:show --max=10

# see only MyClass messages
$ php bin/console messenger:failed:show --class-filter='MyClass'

# see the number of messages by message class
$ php bin/console messenger:failed:show --stats

# see details about a specific failure
$ php bin/console messenger:failed:show 20 -vv

# for each message, this command asks whether to retry, skip, or delete
$ php bin/console messenger:failed:retry -vv

# retry specific messages
$ php bin/console messenger:failed:retry 20 30 --force

# remove a message without retrying it
$ php bin/console messenger:failed:remove 20

# remove messages without retrying them and show each message before removing it
$ php bin/console messenger:failed:remove 20 30 --show-messages

# remove all messages in the failure transport
$ php bin/console messenger:failed:remove --all

如果訊息再次失敗,由於正常的 重試規則,它將重新發送回失敗傳輸方式。一旦達到最大重試次數,訊息將被永久丟棄。

7.2

messenger:failed:retry 命令中跳過訊息的選項是在 Symfony 7.2 中引入的

多個失敗的傳輸器

有時,擁有單一的全域 failed transport 設定是不夠的,因為有些訊息比其他訊息更重要。在這些情況下,您可以僅針對特定傳輸方式覆寫失敗傳輸方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# config/packages/messenger.yaml
framework:
    messenger:
        # after retrying, messages will be sent to the "failed" transport
        # by default if no "failed_transport" is configured inside a transport
        failure_transport: failed_default

        transports:
            async_priority_high:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                failure_transport: failed_high_priority

            # since no failed transport is configured, the one used will be
            # the global "failure_transport" set
            async_priority_low:
                dsn: 'doctrine://default?queue_name=async_priority_low'

            failed_default: 'doctrine://default?queue_name=failed_default'
            failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'

如果沒有全域或在傳輸方式層級定義 failure_transport,則訊息將在重試次數後被丟棄。

失敗命令有一個可選選項 --transport,用於指定在傳輸方式層級設定的 failure_transport

1
2
3
4
5
6
7
8
# see all messages in "failure_transport" transport
$ php bin/console messenger:failed:show --transport=failure_transport

# retry specific messages from "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force

# remove a message without retrying it from "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport

傳輸器設定

Messenger 支援多種不同的傳輸方式類型,每種類型都有自己的選項。選項可以透過 DSN 字串或設定傳遞到傳輸方式。

1
2
# .env
MESSENGER_TRANSPORT_DSN=amqp://127.0.0.1/%2f/messages?auto_setup=false
1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
                options:
                    auto_setup: false

options 下定義的選項優先於在 DSN 中定義的選項。

AMQP 傳輸器

AMQP 傳輸方式使用 AMQP PHP 擴充功能將訊息發送到 RabbitMQ 等佇列。透過運行以下命令安裝它

1
$ composer require symfony/amqp-messenger

AMQP 傳輸方式 DSN 可能看起來像這樣

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

# or use the AMQPS protocol
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages

如果您想使用 TLS/SSL 加密的 AMQP,您還必須提供 CA 憑證。在 amqp.cacert PHP.ini 設定(例如 amqp.cacert = /etc/ssl/certs)或 DSN 的 cacert 參數(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/)中定義憑證路徑。

TLS/SSL 加密 AMQP 使用的預設埠是 5671,但您可以在 DSN 的 port 參數中覆寫它(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/&port=12345)。

注意

預設情況下,傳輸方式將自動建立所需的任何交換器、佇列和綁定金鑰。可以停用此功能,但某些功能可能無法正常運作(例如延遲佇列)。若要不自動建立任何佇列,您可以將傳輸方式設定為 queues: []

注意

您可以限制 AMQP 傳輸方式的消費者僅處理來自交換器某些佇列的訊息。請參閱 Messenger:同步和佇列訊息處理

傳輸方式還有許多其他選項,包括設定交換器、佇列綁定金鑰等的方法。請參閱 Connection 上的文件。

傳輸方式有多個選項

auto_setup (預設值:true)
是否應在發送/取得期間自動建立交換器和佇列。
cacert
PEM 格式的 CA 憑證檔案路徑。
cert
PEM 格式的用戶端憑證路徑。
channel_max
指定伺服器允許的最高通道號碼。0 表示標準擴充功能限制
confirm_timeout
確認逾時時間(以秒為單位);如果未指定,傳輸方式將不會等待訊息確認。注意:0 秒或更長。可以是小數。
connect_timeout
連線逾時時間。注意:0 秒或更長。可以是小數。
frame_max
伺服器針對連線提出的最大幀大小,包括幀標頭和結束位元組。0 表示標準擴充功能限制(取決於 librabbimq 預設幀大小限制)
heartbeat
伺服器想要的連線心跳延遲時間(以秒為單位)。0 表示伺服器不想要心跳。注意,librabbitmq 具有有限的心跳支援,這表示僅在封鎖呼叫期間檢查心跳。
host
AMQP 服務的主機名稱
key
PEM 格式的用戶端金鑰路徑。
login
用於連線 AMQP 服務的使用者名稱
password
用於連線 AMQP 服務的密碼
persistent (預設值:'false')
連線是否為持久連線
port
AMQP 服務的埠
read_timeout
輸入活動的逾時時間。注意:0 秒或更長。可以是小數。
retry
(無描述)
sasl_method
(無描述)
connection_name
用於自訂連線名稱(至少需要 PHP AMQP 擴充功能的 1.10 版本)
verify
啟用或停用同儕驗證。如果啟用同儕驗證,則伺服器憑證中的通用名稱必須與伺服器名稱相符。預設情況下啟用同儕驗證。
vhost
要與 AMQP 服務一起使用的虛擬主機
write_timeout
輸出活動的逾時時間。注意:0 秒或更長。可以是小數。
delay[queue_name_pattern] (預設值:delay_%exchange_name%_%routing_key%_%delay%)
用於建立佇列的模式
delay[exchange_name] (預設值:delays)
用於延遲/重試訊息的交換器名稱
queues[name][arguments]
額外引數
queues[name][binding_arguments]
綁定佇列時要使用的引數。
queues[name][binding_keys]
要綁定到此佇列的綁定金鑰(如果有的話)
queues[name][flags] (預設值:AMQP_DURABLE)
佇列旗標
exchange[arguments]
交換器的額外引數(例如 alternate-exchange
exchange[default_publish_routing_key]
發佈時要使用的路由金鑰,如果訊息上未指定任何路由金鑰
exchange[flags] (預設值:AMQP_DURABLE)
交換器旗標
exchange[name]
交換器的名稱
exchange[type] (預設值:fanout)
交換器的類型

您也可以透過將 AmqpStamp 新增到您的 Envelope,在您的訊息上設定 AMQP 特有的設定

1
2
3
4
5
6
7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...

$attributes = [];
$bus->dispatch(new SmsNotification(), [
    new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);

警告

消費者不會顯示在管理面板中,因為此傳輸方式不依賴於會封鎖的 \AmqpQueue::consume()。擁有封鎖接收器會使 messenger:consume 命令的 --time-limit/--memory-limit 選項以及 messenger:stop-workers 命令效率低下,因為它們都依賴於接收器立即返回的事實,無論它是否找到訊息。消費 worker 負責迭代,直到它收到要處理的訊息和/或直到達到其中一個停止條件。因此,如果 worker 卡在封鎖呼叫中,則無法達到 worker 的停止邏輯。

提示

如果您的應用程式遇到 socket 例外或 高連線流失(由連線的快速建立和刪除顯示),請考慮使用 AMQProxy。此工具充當 Symfony Messenger 和 AMQP 伺服器之間的閘道,維護穩定的連線並最大限度地減少開銷(這也提高了整體效能)。

Doctrine 傳輸器

Doctrine 傳輸方式可用於將訊息儲存在資料庫表格中。透過運行以下命令安裝它

1
$ composer require symfony/doctrine-messenger

Doctrine 傳輸方式 DSN 可能看起來像這樣

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default

格式為 doctrine://<connection_name>,如果您有多個連線並且想要使用「default」以外的連線。傳輸方式將自動建立一個名為 messenger_messages 的表格。

如果您想變更預設表格名稱,請使用 table_name 選項在 DSN 中傳遞自訂表格名稱

1
2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name

或者,若要自行建立表格,請將 auto_setup 選項設定為 false產生遷移

傳輸方式有多個選項

table_name (預設值:messenger_messages)
表格名稱
queue_name (預設值:default)
佇列名稱(表格中的一個欄位,用於將一個表格用於多個傳輸方式)
redeliver_timeout (預設值:3600)

重試佇列中但處於「處理中」狀態的訊息之前的逾時時間(如果 worker 因故停止,則會發生這種情況,最終您應該重試訊息) - 以秒為單位。

注意

redeliver_timeout 設定為大於您最慢訊息持續時間的值。否則,某些訊息將在第一個訊息仍在處理時第二次啟動。

auto_setup
是否應在發送/取得期間自動建立表格。

使用 PostgreSQL 時,您可以存取以下選項以利用 LISTEN/NOTIFY 功能。與 Doctrine 傳輸方式的預設輪詢行為相比,這允許更高效的方法,因為 PostgreSQL 將在表格中插入新訊息時直接通知 workers。

use_notify (預設值:true)
是否使用 LISTEN/NOTIFY。
check_delayed_interval (預設值:60000)
檢查延遲訊息的間隔(以毫秒為單位)。設定為 0 以停用檢查。
get_notify_timeout (預設值:0)
呼叫 PDO::pgsqlGetNotify 時等待回應的時間長度(以毫秒為單位)。

Beanstalkd 傳輸器

Beanstalkd 傳輸方式將訊息直接發送到 Beanstalkd 工作佇列。透過運行以下命令安裝它

1
$ composer require symfony/beanstalkd-messenger

Beanstalkd 傳輸方式 DSN 可能看起來像這樣

1
2
3
4
5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1:11300?tube_name=foo&timeout=4&ttr=120

# If no port, it will default to 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1

傳輸方式有多個選項

tube_name (預設值:default)
佇列名稱
timeout (預設值:0)
訊息預約逾時時間 - 以秒為單位。0 將導致伺服器立即傳回回應,否則將拋出 TransportException。
ttr (預設值:90)
訊息在放回就緒佇列之前要運行的時間 - 以秒為單位。

7.2

在 Symfony 7.2 中新增了使用 --keepalive 選項的 Keepalive 支援。

Redis 傳輸器

Redis 傳輸方式使用 streams 來佇列訊息。此傳輸方式需要 Redis PHP 擴充功能(>=4.3)和正在運行的 Redis 伺服器(^5.0)。透過運行以下命令安裝它

1
$ composer require symfony/redis-messenger

Redis 傳輸方式 DSN 可能看起來像這樣

1
2
3
4
5
6
7
8
9
10
11
12
# .env
MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages
# Full DSN Example
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
# Redis Cluster Example
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Unix Socket Example
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# TLS Example
MESSENGER_TRANSPORT_DSN=rediss://127.0.0.1:6379/messages
# Multiple Redis Sentinel Hosts Example
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db

可以透過 DSN 或 messenger.yaml 中傳輸方式下的 options 鍵來設定多個選項

stream (預設值:messages)
Redis stream 名稱
group (預設值:symfony)
Redis 消費者群組名稱
consumer (預設值:consumer)
Redis 中使用的消費者名稱
auto_setup (預設值:true)
是否自動建立 Redis 群組
auth
Redis 密碼
delete_after_ack (預設值:true)
如果 true,訊息會在處理後自動刪除
delete_after_reject (預設值:true)
如果 true,訊息會在被拒絕後自動刪除
lazy (預設值:false)
僅在真正需要連線時才連線
serializer (預設值:Redis::SERIALIZER_PHP)
如何在 Redis 中序列化最終酬載(Redis::OPT_SERIALIZER 選項)
stream_max_entries (預設值:0)
stream 將被修剪到的最大條目數。將其設定為足夠大的數字,以避免遺失待處理的訊息
redeliver_timeout (預設值:3600)
在重試由已放棄的消費者擁有的待處理訊息之前的逾時時間(以秒為單位)(如果 worker 因故死掉,則會發生這種情況,最終您應該重試訊息)。
claim_interval (預設值:60000)
應檢查待處理/已放棄訊息以進行聲明的間隔 - 以毫秒為單位
persistent_id (預設值:null)
字串,如果為 null,則連線為非持久連線。
retry_interval (預設值:0)
整數,值以毫秒為單位
read_timeout (預設值:0)
浮點數,值以秒為單位,預設值表示無限
timeout (預設值:0)
連線逾時時間。浮點數,值以秒為單位,預設值表示無限
sentinel_master (預設值:null)
字串,如果為 null 或空字串,則停用 Sentinel 支援
redis_sentinel (預設值:null)

sentinel_master 選項的別名

7.1

redis_sentinel 選項是在 Symfony 7.1 中引入的。

ssl (預設值:null)

SSL 上下文選項的對應,用於 TLS 通道。例如,這對於變更測試中 TLS 通道的需求很有用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            redis:
                dsn: "rediss://127.0.0.1"
                options:
                    ssl:
                        allow_self_signed: true
                        capture_peer_cert: true
                        capture_peer_cert_chain: true
                        disable_compression: true
                        SNI_enabled: true
                        verify_peer: true
                        verify_peer_name: true

警告

對於相同的 streamgroupconsumer 組合,永遠不應有多個 messenger:consume 命令在運行,否則訊息最終可能會被多次處理。如果您運行多個佇列 workers,consumer 可以設定為環境變數,例如 %env(MESSENGER_CONSUMER_NAME)%,由 Supervisor(如下例)或用於管理 worker 程序的任何其他服務設定。在容器環境中,HOSTNAME 可以用作消費者名稱,因為每個容器/主機只有一個 worker。如果使用 Kubernetes 來協調容器,請考慮使用 StatefulSet 以獲得穩定的名稱。

提示

delete_after_ack 設定為 true(如果您使用單一群組)或定義 stream_max_entries(如果您可以估計在您的情況下可接受的最大條目數),以避免記憶體洩漏。否則,所有訊息將永遠保留在 Redis 中。

記憶體內傳輸器

in-memory 傳輸方式實際上不會傳遞訊息。相反,它會在請求期間將它們保存在記憶體中,這對於測試很有用。例如,如果您有 async_priority_normal 傳輸方式,您可以在 test 環境中覆寫它以使用此傳輸方式

1
2
3
4
5
# config/packages/test/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: 'in-memory://'

然後,在測試時,訊息不會傳遞到真實的傳輸方式。更好的是,在測試中,您可以檢查在請求期間是否只發送了一條訊息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;

use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;

class DefaultControllerTest extends WebTestCase
{
    public function testSomething(): void
    {
        $client = static::createClient();
        // ...

        $this->assertSame(200, $client->getResponse()->getStatusCode());

        /** @var InMemoryTransport $transport */
        $transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
        $this->assertCount(1, $transport->getSent());
    }
}

傳輸方式有多個選項

serialize (布林值,預設值:false)
是否序列化訊息。這對於測試額外層很有用,尤其是在您使用自己的訊息序列化器時。

注意

所有 in-memory 傳輸方式將在每個測試後自動重設,擴充 KernelTestCaseWebTestCase 的測試類別中。

Amazon SQS

Amazon SQS 傳輸方式非常適合託管在 AWS 上的應用程式。透過運行以下命令安裝它

1
$ composer require symfony/amazon-sqs-messenger

SQS 傳輸方式 DSN 可能看起來像這樣

1
2
3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://127.0.0.1:9494/messages?sslmode=disable

注意

傳輸方式將自動建立所需的佇列。可以透過將 auto_setup 選項設定為 false 來停用此功能。

提示

在發送或接收訊息之前,Symfony 需要透過在 AWS 中呼叫 GetQueueUrl API 將佇列名稱轉換為 AWS 佇列 URL。可以透過提供作為佇列 URL 的 DSN 來避免此額外的 API 呼叫。

傳輸方式有多個選項

access_key
AWS 存取金鑰(必須進行 URL 編碼)
account (預設值:憑證的擁有者)
AWS 帳戶的識別碼
auto_setup (預設值:true)
佇列是否應在傳送/接收期間自動建立。
buffer_size (預設值:9)
要預先提取的訊息數量
debug (預設值:false)
若為 true,則記錄所有 HTTP 請求和回應 (會影響效能)
endpoint (預設值:https://sqs.eu-west-1.amazonaws.com)
SQS 服務的絕對 URL
poll_timeout (預設值:0.1)
等待新訊息的持續時間 (秒)
queue_name (預設值:messages)
佇列名稱
region (預設值:eu-west-1)
AWS 區域的名稱
secret_key
AWS 密鑰 (必須進行 URL 編碼)
session_token
AWS 工作階段權杖
visibility_timeout (預設值:佇列的組態)
訊息將不可見的秒數 (可見性逾時)
wait_time (預設值:20)
長輪詢持續時間 (秒)

注意

wait_time 參數定義 Amazon SQS 在傳送回應之前,應等待佇列中訊息可用的最長持續時間。這有助於減少使用 Amazon SQS 的成本,方法是消除空回應的數量。

poll_timeout 參數定義接收器應等待多久才傳回 null 的持續時間。這可避免封鎖其他接收器被呼叫。

注意

如果佇列名稱的後綴為 .fifo,AWS 將建立 FIFO 佇列。使用戳記 AmazonSqsFifoStamp 定義訊息群組 ID訊息重複資料刪除 ID

另一種可能性是啟用 AddFifoStampMiddleware。如果您的訊息實作 MessageDeduplicationAwareInterface,中介軟體將自動新增 AmazonSqsFifoStamp 並設定訊息重複資料刪除 ID。此外,如果您的訊息實作 MessageGroupAwareInterface,中介軟體將自動設定戳記的訊息群組 ID

您可以在專門章節中了解更多關於中介軟體的資訊。

FIFO 佇列不支援為每個訊息設定延遲,重試策略設定中需要 delay: 0 的值。

7.2

使用 `--keepalive` 選項的 Keepalive 支援已在 Symfony 7.2 中新增。

序列化訊息

當訊息傳送至 (和接收自) 傳輸時,它們會使用 PHP 的原生 serialize() & unserialize() 函數進行序列化。您可以將此全域 (或針對每個傳輸) 變更為實作 SerializerInterface 的服務

1
2
3
4
5
6
7
8
9
10
11
12
13
# config/packages/messenger.yaml
framework:
    messenger:
        serializer:
            default_serializer: messenger.transport.symfony_serializer
            symfony_serializer:
                format: json
                context: { }

        transports:
            async_priority_normal:
                dsn: # ...
                serializer: messenger.transport.symfony_serializer

messenger.transport.symfony_serializer 是一個內建服務,它使用 Serializer 組件,並且可以用幾種方式進行組態。如果您確實選擇使用 Symfony serializer,您可以透過 SerializerStamp,依個案控制內容 (請參閱信封 & 戳記)。

提示

當將訊息傳送/接收到/從另一個應用程式時,您可能需要更精細地控制序列化程序。使用自訂序列化器可提供該控制。請參閱 SymfonyCasts 的訊息序列化器教學課程 以了解詳細資訊。

執行命令和外部程序

觸發命令

可以透過分派 RunCommandMessage 來觸發任何命令。Symfony 將負責處理此訊息並執行傳遞至訊息參數的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class CleanUpService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function cleanUp(): void
    {
        // Long task with some caching...

        // Once finished, dispatch some clean up commands
        $this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
        $this->bus->dispatch(new RunCommandMessage('cache:clear'));
    }
}

您可以組態在命令執行期間發生錯誤時的行為。若要執行此操作,您可以在建立 RunCommandMessage 實例時,使用 throwOnFailurecatchExceptions 參數。

處理完成後,處理常式將傳回 RunCommandContext,其中包含許多有用的資訊,例如結束代碼或程序的輸出。您可以參閱關於處理常式結果的專門頁面以取得更多資訊。

觸發外部程序

Messenger 隨附一個方便的輔助程式,可透過分派訊息來執行外部程序。這利用了 Process 組件。透過分派 RunProcessMessage,Messenger 將負責使用您傳遞的參數建立新的程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Process\Messenger\RunProcessMessage;

class CleanUpService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function cleanUp(): void
    {
        $this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));

        // ...
    }
}

處理完成後,處理常式將傳回 RunProcessContext,其中包含許多有用的資訊,例如結束代碼或程序的輸出。您可以參閱關於處理常式結果的專門頁面以取得更多資訊。

Ping Web 服務

有時,您可能需要定期 ping Web 服務以取得其狀態,例如,它是啟動還是關閉。可以透過分派 PingWebhookMessage 來執行此操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;

class LivenessService
{
    public function __construct(private readonly MessageBusInterface $bus)
    {
    }

    public function ping(): void
    {
        // An HttpExceptionInterface is thrown on 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));

        // Ping, but does not throw on 3xx/4xx/5xx
        $this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));

        // Any valid HttpClientInterface option can be used
        $this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
            'headers' => [
                'Authorization' => 'Bearer ...'
            ],
            'json' => [
                'data' => 'some-data',
            ],
        ]));
    }
}

處理常式將傳回 ResponseInterface,讓您可以收集和處理 HTTP 請求傳回的資訊。

從您的處理器取得結果

當訊息被處理時,HandleMessageMiddleware 會為每個處理訊息的物件新增 HandledStamp。您可以使用它來取得處理常式傳回的值

1
2
3
4
5
6
7
8
9
10
11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

$envelope = $messageBus->dispatch(new SomeMessage());

// get the value that was returned by the last message handler
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();

// or get info about all of handlers
$handledStamps = $envelope->all(HandledStamp::class);

使用命令 & 查詢總線時取得結果

Messenger 組件可用於 CQRS 架構中,其中命令和查詢匯流排是應用程式的核心組件。請閱讀 Martin Fowler 的 關於 CQRS 的文章以了解更多資訊,以及如何組態多個匯流排

由於查詢通常是同步的且預期只處理一次,因此從處理常式取得結果是常見的需求。

存在 HandleTrait,可在同步處理時取得處理常式的結果。它也確保只註冊一個處理常式。HandleTrait 可用於具有 $messageBus 屬性的任何類別中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// src/Action/ListItems.php
namespace App\Action;

use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class ListItems
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    public function __invoke(): void
    {
        $result = $this->query(new ListItemsQuery(/* ... */));

        // Do something with the result
        // ...
    }

    // Creating such a method is optional, but allows type-hinting the result
    private function query(ListItemsQuery $query): ListItemsQueryResult
    {
        return $this->handle($query);
    }
}

因此,您可以使用 trait 來建立命令和查詢匯流排類別。例如,您可以建立一個特殊的 QueryBus 類別,並在任何需要查詢匯流排行為的地方注入它,而不是 MessageBusInterface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;

class QueryBus
{
    use HandleTrait;

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
    }

    /**
     * @param object|Envelope $query
     *
     * @return mixed The handler returned value
     */
    public function query($query): mixed
    {
        return $this->handle($query);
    }
}

自訂處理器

手動設定處理器

Symfony 通常會自動尋找並註冊您的處理常式。但是,您也可以在使用 #AsMessageHandler 屬性或使用 messenger.message_handler 標籤標記處理常式服務時,手動組態處理常式,並傳遞一些額外的組態。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message): void
    {
        // ...
    }
}

使用標籤組態的可能選項為

bus
處理常式可以從中接收訊息的匯流排名稱,預設為所有匯流排。
from_transport
處理常式可以從中接收訊息的傳輸名稱,預設為所有傳輸。
handles
可以由處理常式處理的訊息類型 (FQCN),只有在無法透過類型提示猜測時才需要。
method
將處理訊息的方法名稱。
priority
當多個處理常式可以處理相同的訊息時,處理常式的優先順序。

處理多個訊息

單一處理常式類別可以處理多個訊息。為此,請將 #AsMessageHandler 屬性新增至所有處理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;

class SmsNotificationHandler
{
    #[AsMessageHandler]
    public function handleSmsNotification(SmsNotification $message): void
    {
        // ...
    }

    #[AsMessageHandler]
    public function handleOtherSmsNotification(OtherSmsNotification $message): void
    {
        // ...
    }
}

事務訊息:在處理完成後處理新訊息

訊息處理常式可以在處理其他訊息時分派新訊息,到相同或不同的匯流排 (如果應用程式有多個匯流排)。在此過程中發生的任何錯誤或例外狀況都可能產生意想不到的後果,例如

  1. 如果使用 DoctrineTransactionMiddleware 且分派的訊息擲回例外狀況,則原始處理常式中的任何資料庫交易都將被回滾。
  2. 如果訊息被分派到不同的匯流排,則即使目前處理常式稍後的某些程式碼擲回例外狀況,分派的訊息仍將被處理。

範例 RegisterUser 程序

考慮一個同時具有命令事件匯流排的應用程式。應用程式將名為 RegisterUser 的命令分派到命令匯流排。RegisterUserHandler 處理常式會處理該命令,該處理常式會建立 User 物件、將該物件儲存到資料庫,並將 UserRegistered 訊息分派到事件匯流排。

有許多 UserRegistered 訊息的處理常式,其中一個處理常式可能會向新使用者傳送歡迎電子郵件。我們正在使用 DoctrineTransactionMiddleware 將所有資料庫查詢包裝在一個資料庫交易中。

問題 1:如果在傳送歡迎電子郵件時擲回例外狀況,則使用者將不會被建立,因為 DoctrineTransactionMiddleware 將回滾 Doctrine 交易,使用者已在其中建立。

問題 2:如果在將使用者儲存到資料庫時擲回例外狀況,則歡迎電子郵件仍會傳送,因為它是非同步處理的。

DispatchAfterCurrentBusMiddleware 中介軟體

對於許多應用程式而言,所需的行為是在處理常式完全完成後,才處理由處理常式分派的訊息。這可以使用 DispatchAfterCurrentBusMiddleware 並將 DispatchAfterCurrentBusStamp 戳記新增至 訊息信封 來完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;

use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

class RegisterUserHandler
{
    public function __construct(
        private MessageBusInterface $eventBus,
        private EntityManagerInterface $em,
    ) {
    }

    public function __invoke(RegisterUser $command): void
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        // The DispatchAfterCurrentBusStamp marks the event message to be handled
        // only if this handler does not throw an exception.

        $event = new UserRegistered($command->getUuid());
        $this->eventBus->dispatch(
            (new Envelope($event))
                ->with(new DispatchAfterCurrentBusStamp())
        );

        // ...
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;

use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;

class WhenUserRegisteredThenSendWelcomeEmail
{
    public function __construct(
        private MailerInterface $mailer,
        EntityManagerInterface $em,
    ) {
    }

    public function __invoke(UserRegistered $event): void
    {
        $user = $this->em->getRepository(User::class)->find($event->getUuid());

        $this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
    }
}

這表示 UserRegistered 訊息將在 RegisterUserHandler 完成且新的 User 持續保存到資料庫之後才會被處理。如果 RegisterUserHandler 遇到例外狀況,則 UserRegistered 事件將永遠不會被處理。如果在傳送歡迎電子郵件時擲回例外狀況,則 Doctrine 交易將不會被回滾。

注意

如果 WhenUserRegisteredThenSendWelcomeEmail 擲回例外狀況,則該例外狀況將被包裝在 DelayedMessageHandlingException 中。使用 DelayedMessageHandlingException::getWrappedExceptions 將為您提供在使用 DispatchAfterCurrentBusStamp 處理訊息時擲回的所有例外狀況。

dispatch_after_current_bus 中介軟體預設為啟用。如果您要手動組態中介軟體,請務必在 middleware 鏈中於 doctrine_transaction 之前註冊 dispatch_after_current_bus。此外,所有正在使用的匯流排都必須載入 dispatch_after_current_bus 中介軟體。

將處理器綁定到不同的傳輸器

每個訊息可以有多個處理常式,並且當訊息被取用時,會呼叫其所有處理常式。但是,您也可以組態處理常式僅在從特定傳輸接收到訊息時才被呼叫。這允許您擁有單一訊息,其中每個處理常式都由取用不同傳輸的不同「工作者」呼叫。

假設您有一個具有兩個處理常式的 UploadedImage 訊息

  • ThumbnailUploadedImageHandler:您希望由名為 image_transport 的傳輸來處理
  • NotifyAboutNewUploadedImageHandler:您希望由名為 async_priority_normal 的傳輸來處理

若要執行此操作,請將 from_transport 選項新增至每個處理常式。例如

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;

use App\Message\UploadedImage;

#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
    public function __invoke(UploadedImage $uploadedImage): void
    {
        // do some thumbnailing
    }
}

並且類似地

1
2
3
4
5
6
7
8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...

#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
    // ...
}

然後,請確保將您的訊息「路由」到兩個傳輸

1
2
3
4
5
6
7
8
9
10
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async_priority_normal: # ...
            image_transport: # ...

        routing:
            # ...
            'App\Message\UploadedImage': [image_transport, async_priority_normal]

就是這樣!您現在可以取用每個傳輸

1
2
3
4
# will only call ThumbnailUploadedImageHandler when handling the message
$ php bin/console messenger:consume image_transport -vv

$ php bin/console messenger:consume async_priority_normal -vv

警告

如果處理常式沒有 from_transport 組態,則它將在訊息接收自的每個傳輸上執行。

批次處理訊息

您可以宣告「特殊」處理常式,這些處理常式將分批處理訊息。透過這樣做,處理常式將等待一定數量的訊息處於擱置狀態,然後再處理它們。批次處理常式的宣告是透過實作 BatchHandlerInterface 來完成的。BatchHandlerTrait 也提供用於簡化這些特殊處理常式的宣告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;

class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, ?Acknowledger $ack = null): mixed
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                // Compute $result from $message...

                // Acknowledge the processing of the message
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }

    // Optionally, you can override some of the trait methods, such as the
    // `getBatchSize()` method, to specify your own batch size...
    private function getBatchSize(): int
    {
        return 100;
    }
}

注意

__invoke()$ack 引數為 null 時,預期訊息會同步處理。否則,預期 __invoke() 會傳回擱置訊息的數量。BatchHandlerTrait 會為您處理此問題。

注意

預設情況下,擱置的批次會在工作者閒置以及停止時刷新。

擴展 Messenger

信封 & 戳記

訊息可以是任何 PHP 物件。有時,您可能需要組態關於訊息的一些額外資訊,例如它在 AMQP 中的處理方式,或在訊息應被處理之前新增延遲。您可以透過將「戳記」新增至您的訊息來執行此操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

public function index(MessageBusInterface $bus): void
{
    // wait 5 seconds before processing
    $bus->dispatch(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]);

    // or explicitly create an Envelope
    $bus->dispatch(new Envelope(new SmsNotification('...'), [
        new DelayStamp(5000),
    ]));

    // ...
}

在內部,每個訊息都包裝在 Envelope 中,它包含訊息和戳記。您可以手動建立此信封,或允許訊息匯流排執行此操作。有多種不同的戳記用於不同的目的,它們在內部用於追蹤關於訊息的資訊,例如處理它的訊息匯流排,或它是否在失敗後正在重試。

中介層

當您將訊息分派到訊息匯流排時會發生什麼情況,取決於其中介軟體的集合及其順序。預設情況下,為每個匯流排組態的中介軟體如下所示

  1. add_bus_name_stamp_middleware - 新增戳記以記錄此訊息分派到哪個匯流排;
  2. dispatch_after_current_bus - 請參閱Messenger:同步 & 佇列訊息處理
  3. failed_message_processing_middleware - 處理透過失敗傳輸重試的訊息,使其正常運作,就像它們是從原始傳輸接收到的一樣;
  4. 您自己的中介軟體集合;
  5. send_message - 如果為傳輸組態了路由,則此選項會將訊息傳送至該傳輸並停止中介軟體鏈;
  6. handle_message - 呼叫給定訊息的訊息處理常式。

注意

這些中介軟體名稱實際上是捷徑名稱。真正的服務 ID 以 messenger.middleware. 為字首 (例如 messenger.middleware.handle_message)。

中介軟體在訊息分派時執行,但在透過工作者接收訊息時再次執行 (對於傳送至傳輸以進行非同步處理的訊息)。如果您建立自己的中介軟體,請記住這一點。

您可以將自己的中介軟體新增至此清單,或完全停用預設中介軟體,並包含您自己的中介軟體。

如果中介軟體服務是抽象的,您可以組態其建構子的引數,並且每個匯流排都會建立不同的實例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            messenger.bus.default:
                # disable the default middleware
                default_middleware: false

                middleware:
                    # use and configure parts of the default middleware you want
                    - 'add_bus_name_stamp_middleware': ['messenger.bus.default']

                    # add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
                    - 'App\Middleware\MyMiddleware'
                    - 'App\Middleware\AnotherMiddleware'

提示

如果您已安裝 MakerBundle,則可以使用 make:messenger-middleware 命令來引導建立您自己的 messenger 中介軟體。

Doctrine 的中介層

如果您在應用程式中使用 Doctrine,則存在許多您可能想要使用的選用中介軟體

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    # each time a message is handled, the Doctrine connection
                    # is "pinged" and reconnected if it's closed. Useful
                    # if your workers run for a long time and the database
                    # connection is sometimes lost
                    - doctrine_ping_connection

                    # After handling, the Doctrine connection is closed,
                    # which can free up database connections in a worker,
                    # instead of keeping them open forever
                    - doctrine_close_connection

                    # logs an error when a Doctrine transaction was opened but not closed
                    - doctrine_open_transaction_logger

                    # wraps all handlers in a single Doctrine transaction
                    # handlers do not need to call flush() and an error
                    # in any handler will cause a rollback
                    - doctrine_transaction

                    # or pass a different entity manager to any
                    #- doctrine_transaction: ['custom']

其他中介層

如果您需要在消費者中產生絕對 URL (例如,使用連結呈現範本),請新增 router_context 中介軟體。此中介軟體會儲存建立絕對 URL 時所需的原始請求內容 (即主機、HTTP 連接埠等)。

如果您需要在處理訊息物件之前,使用 Validator 組件驗證訊息物件,請新增 validation 中介軟體。如果驗證失敗,將擲回 ValidationFailedExceptionValidationStamp 可用於組態驗證群組。

1
2
3
4
5
6
7
8
# config/packages/messenger.yaml
framework:
    messenger:
        buses:
            command_bus:
                middleware:
                    - router_context
                    - validation

Messenger 事件

除了中介軟體之外,Messenger 還會分派數個事件。您可以建立事件接聽器以掛鉤到程序的各個部分。對於每個事件,事件類別都是事件名稱

額外處理器引數

可以使用 HandlerArgumentsStamp,讓 messenger 將額外資料傳遞至訊息處理常式。將此戳記新增至中介軟體中的信封,並填入您希望在處理常式中可用的任何額外資料

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;

final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $envelope = $envelope->with(new HandlerArgumentsStamp([
            $this->resolveAdditionalArgument($envelope->getMessage()),
        ]));

        return $stack->next()->handle($envelope, $stack);
    }

    private function resolveAdditionalArgument(object $message): mixed
    {
        // ...
    }
}

然後您的處理常式將如下所示

1
2
3
4
5
6
7
8
9
10
11
12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;

final class SmsNotificationHandler
{
    public function __invoke(SmsNotification $message, mixed $additionalArgument)
    {
        // ...
    }
}

自訂資料格式的訊息序列化器

如果您從其他應用程式接收訊息,則它們的格式可能與您需要的格式不完全相同。並非所有應用程式都會傳回具有 bodyheaders 欄位的 JSON 訊息。在這些情況下,您需要建立新的訊息序列化器,以實作 SerializerInterface。假設您想要建立訊息解碼器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
namespace App\Messenger\Serializer;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class MessageWithTokenDecoder implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        try {
            // parse the data you received with your custom fields
            $data = $encodedEnvelope['data'];
            $data['token'] = $encodedEnvelope['token'];

            // other operations like getting information from stamps
        } catch (\Throwable $throwable) {
            // wrap any exception that may occur in the envelope to send it to the failure transport
            return new Envelope($throwable);
        }

        return new Envelope($data);
    }

    public function encode(Envelope $envelope): array
    {
        // this decoder does not encode messages, but you can implement it by returning
        // an array with serialized stamps if you need to send messages in a custom format
        throw new \LogicException('This serializer is only used for decoding messages.');
    }
}

下一步是告訴 Symfony 在您的一個或多個傳輸中使用此序列化器

1
2
3
4
5
6
7
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            my_transport:
                dsn: '%env(MY_TRANSPORT_DSN)%'
                serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'

多個總線、命令 & 事件總線

Messenger 預設為您提供單一訊息匯流排服務。但是,您可以組態任意數量的訊息匯流排,建立「命令」、「查詢」或「事件」匯流排,並控制它們的中介軟體。

建置應用程式時的常見架構是將命令與查詢分開。命令是執行某些操作的動作,而查詢則擷取資料。這稱為 CQRS (命令查詢職責分離)。請參閱 Martin Fowler 的 關於 CQRS 的文章以了解更多資訊。此架構可以與 Messenger 組件結合使用,方法是定義多個匯流排。

命令匯流排查詢匯流排略有不同。例如,命令匯流排通常不提供任何結果,而查詢匯流排很少是非同步的。您可以使用中介軟體組態這些匯流排及其規則。

透過引入事件匯流排來將動作與反應分開也可能是個好主意。事件匯流排可以有零個或多個訂閱者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
framework:
    messenger:
        # The bus that is going to be injected when injecting MessageBusInterface
        default_bus: command.bus
        buses:
            command.bus:
                middleware:
                    - validation
                    - doctrine_transaction
            query.bus:
                middleware:
                    - validation
            event.bus:
                default_middleware:
                    enabled: true
                    # set "allow_no_handlers" to true (default is false) to allow having
                    # no handler configured for this bus without throwing an exception
                    allow_no_handlers: false
                    # set "allow_no_senders" to false (default is true) to throw an exception
                    # if no sender is configured for this bus
                    allow_no_senders: true
                middleware:
                    - validation

這將建立三個新服務

  • command.bus:可使用 MessageBusInterface 類型提示自動連線 (因為這是 default_bus);
  • query.bus:可使用 MessageBusInterface $queryBus 自動連線;
  • event.bus:可使用 MessageBusInterface $eventBus 自動連線。

限制每個總線的處理器

預設情況下,每個處理常式都可以在所有匯流排上處理訊息。若要防止在沒有錯誤的情況下將訊息分派到錯誤的匯流排,您可以使用 messenger.message_handler 標籤將每個處理常式限制為特定的匯流排

1
2
3
4
# config/services.yaml
services:
    App\MessageHandler\SomeCommandHandler:
        tags: [{ name: messenger.message_handler, bus: command.bus }]

透過這種方式,App\MessageHandler\SomeCommandHandler 處理常式將僅為 command.bus 匯流排所知。

您也可以使用 _instanceof 服務組態,自動將此標籤新增至許多類別。使用此組態,您可以根據實作的介面判斷訊息匯流排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# config/services.yaml
services:
    # ...

    _instanceof:
        # all services implementing the CommandHandlerInterface
        # will be registered on the command.bus bus
        App\MessageHandler\CommandHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: command.bus }

        # while those implementing QueryHandlerInterface will be
        # registered on the query.bus bus
        App\MessageHandler\QueryHandlerInterface:
            tags:
                - { name: messenger.message_handler, bus: query.bus }

偵錯總線

debug:messenger 命令會列出每個匯流排可用的訊息和處理常式。您也可以透過提供匯流排名稱作為引數,將清單限制為特定的匯流排。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ php bin/console debug:messenger

  Messenger
  =========

  command.bus
  -----------

   The following messages can be dispatched:

   ---------------------------------------------------------------------------------------
    App\Message\DummyCommand
        handled by App\MessageHandler\DummyCommandHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

  query.bus
  ---------

   The following messages can be dispatched:

   ---------------------------------------------------------------------------------------
    App\Message\DummyQuery
        handled by App\MessageHandler\DummyQueryHandler
    App\Message\MultipleBusesMessage
        handled by App\MessageHandler\MultipleBusesMessageHandler
   ---------------------------------------------------------------------------------------

提示

該命令還將顯示訊息和處理常式類別的 PHPDoc 描述。

重新分派訊息

如果您想要重新分派訊息 (使用相同的傳輸和信封),請建立新的 RedispatchMessage 並透過您的匯流排分派它。重新使用先前顯示的相同 SmsNotification 範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;

use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsMessageHandler]
class SmsNotificationHandler
{
    public function __construct(private MessageBusInterface $bus)
    {
    }

    public function __invoke(SmsNotification $message): void
    {
        // do something with the message
        // then redispatch it based on your own logic

        if ($needsRedispatch) {
            $this->bus->dispatch(new RedispatchMessage($message));
        }
    }
}

內建的 RedispatchMessageHandler 將負責處理此訊息,以透過最初分派它的相同匯流排重新分派它。您也可以使用 RedispatchMessage 建構子的第二個引數來提供在重新分派訊息時要使用的傳輸。

這項工作 (包括程式碼範例) 根據 Creative Commons BY-SA 3.0 授權條款授權。
TOC
    版本