跳到內容

如何建立您自己的 Messenger 傳輸方式

編輯此頁面

一旦您撰寫了傳輸方式的發送器和接收器,您可以註冊您的傳輸工廠,以便在 Symfony 應用程式中使用 DSN。

建立您的傳輸工廠

您需要讓 FrameworkBundle 有機會從 DSN 建立您的傳輸方式。您將需要一個傳輸工廠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransportFactory implements TransportFactoryInterface
{
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        return new YourTransport(/* ... */);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'my-transport://');
    }
}

傳輸物件需要實作 TransportInterface(它結合了 SenderInterfaceReceiverInterface)。以下是一個簡化的資料庫傳輸範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Uid\Uuid;

class YourTransport implements TransportInterface
{
    private SerializerInterface $serializer;

    /**
     * @param FakeDatabase $db is used for demo purposes. It is not a real class.
     */
    public function __construct(
        private FakeDatabase $db,
        ?SerializerInterface $serializer = null,
    ) {
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    public function get(): iterable
    {
        // Get a message from "my_queue"
        $row = $this->db->createQuery(
                'SELECT *
                FROM my_queue
                WHERE (delivered_at IS NULL OR delivered_at < :redeliver_timeout)
                AND handled = FALSE'
            )
            ->setParameter('redeliver_timeout', new DateTimeImmutable('-5 minutes'))
            ->getOneOrNullResult();

        if (null === $row) {
            return [];
        }

        $envelope = $this->serializer->decode([
            'body' => $row['envelope'],
        ]);

        return [$envelope->with(new TransportMessageIdStamp($row['id']))];
    }

    public function ack(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Mark the message as "handled"
        $this->db->createQuery('UPDATE my_queue SET handled = TRUE WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function reject(Envelope $envelope): void
    {
        $stamp = $envelope->last(TransportMessageIdStamp::class);
        if (!$stamp instanceof TransportMessageIdStamp) {
            throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
        }

        // Delete the message from the "my_queue" table
        $this->db->createQuery('DELETE FROM my_queue WHERE id = :id')
            ->setParameter('id', $stamp->getId())
            ->execute();
    }

    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);
        $uuid = (string) Uuid::v4();
        // Add a message to the "my_queue" table
        $this->db->createQuery(
                'INSERT INTO my_queue (id, envelope, delivered_at, handled)
                VALUES (:id, :envelope, NULL, FALSE)'
            )
            ->setParameters([
                'id' => $uuid,
                'envelope' => $encodedMessage['body'],
            ])
            ->execute();

        return $envelope->with(new TransportMessageIdStamp($uuid));
    }
}

上面的實作不是可執行的程式碼,但說明了 TransportInterface 可以如何實作。如需實際的實作範例,請參閱 InMemoryTransportDoctrineReceiver

註冊您的工廠

在使用您的工廠之前,您必須先註冊它。如果您使用預設的 services.yaml 設定,這已經為您完成,這要歸功於自動配置。否則,請新增以下內容

1
2
3
4
# config/services.yaml
services:
    Your\Transport\YourTransportFactory:
        tags: [messenger.transport_factory]

使用您的傳輸方式

framework.messenger.transports.* 設定中,使用您自己的 DSN 建立您的命名傳輸方式

1
2
3
4
5
# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            yours: 'my-transport://...'

除了能夠將您的訊息路由到 yours 發送器之外,這還將讓您可以存取以下服務

  1. messenger.sender.yours:發送器;
  2. messenger.receiver.yours:接收器。
這份作品,包含程式碼範例,以 Creative Commons BY-SA 3.0 授權條款發佈。
目錄
    版本