RabbitMQ
Движок работает с RabbitMQ.
RabbitMQ позволяет:
- Публиковать/подписываться на потоки данных.
- Обрабатывать потоки по мере их появления.
Создание таблицы
Обязательные параметры:
- rabbitmq_host_port– адрес сервера (- хост:порт). Например:- localhost:5672.
- rabbitmq_exchange_name– имя точки обмена в RabbitMQ.
- rabbitmq_format– формат сообщения. Используется такое же обозначение, как и в функции- FORMATв SQL, например,- JSONEachRow. Подробнее см. в разделе Форматы входных и выходных данных.
Дополнительные параметры:
- rabbitmq_exchange_type– тип точки обмена в RabbitMQ:- direct,- fanout,- topic,- headers,- consistent_hash. По умолчанию:- fanout.
- rabbitmq_routing_key_list– список ключей маршрутизации, через запятую.
- rabbitmq_row_delimiter– символ-разделитель, который завершает сообщение.
- rabbitmq_schema– опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap’n Proto требует путь к файлу со схемой и название корневого объекта- schema.capnp:Message.
- rabbitmq_num_consumers– количество потребителей на таблицу. По умолчанию:- 1. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- rabbitmq_num_queues– количество очередей. По умолчанию:- 1. Большее число очередей может сильно увеличить пропускную способность.
- rabbitmq_queue_base- настройка для имен очередей. Сценарии использования описаны ниже.
- rabbitmq_persistent- флаг, от которого зависит настройка 'durable' для сообщений при запросах- INSERT. По умолчанию:- 0.
- rabbitmq_skip_broken_messages– максимальное количество некорректных сообщений в блоке. Если- rabbitmq_skip_broken_messages = N, то движок отбрасывает- Nсообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.
- rabbitmq_max_block_size
- rabbitmq_flush_interval_ms
- rabbitmq_queue_settings_list- позволяет самостоятельно установить настройки RabbitMQ при создании очереди. Доступные настройки:- x-max-length,- x-max-length-bytes,- x-message-ttl,- x-expires,- x-priority,- x-max-priority,- x-overflow,- x-dead-letter-exchange,- x-queue-type. Настрока- durableдля очереди ставится автоматически.
Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек.
Example:
Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла ClickHouse.
Требуемая конфигурация:
Дополнительная конфигурация:
Описание
Запрос SELECT не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:
- Создайте потребителя RabbitMQ с помощью движка и рассматривайте его как поток данных.
- Создайте таблицу с необходимой структурой.
- Создайте материализованное представление, которое преобразует данные от движка и помещает их в ранее созданную таблицу.
Когда к движку присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от RabbitMQ и преобразовывать их в необходимый формат с помощью SELECT.
У одной таблицы RabbitMQ может быть неограниченное количество материализованных представлений.
Данные передаются с помощью параметров rabbitmq_exchange_type и rabbitmq_routing_key_list.
Может быть не более одной точки обмена на таблицу. Одна точка обмена может использоваться несколькими таблицами: это позволяет выполнять маршрутизацию по нескольким таблицам одновременно.
Параметры точек обмена:
- direct- маршрутизация основана на точном совпадении ключей. Пример списка ключей:- key1,key2,key3,key4,key5. Ключ сообщения может совпадать с одним из них.
- fanout- маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.
- topic- маршрутизация основана на правилах с ключами, разделенными точками. Например:- *.logs,- records.*.*.2020,- *.2018,*.2019,*.2020.
- headers- маршрутизация основана на совпадении- key=valueс настройкой- x-match=allили- x-match=any. Пример списка ключей таблицы:- x-match=all,format=logs,type=report,year=2020.
- consistent_hash- данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ:- rabbitmq-plugins enable rabbitmq_consistent_hash_exchange.
Настройка rabbitmq_queue_base может быть использована в следующих случаях:
- чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в rabbitmq_queue_baseнастройку и не указывать настройкиrabbitmq_num_consumersиrabbitmq_num_queues. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек:rabbitmq_queue_base,rabbitmq_num_consumers,rabbitmq_num_queues. По умолчанию, если настройкаrabbitmq_queue_baseне указана, будут использованы уникальные для каждой таблицы имена очередей.
- чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: rabbitmq_num_consumers,rabbitmq_num_queues.
- чтобы повторно использовать созданные c durableнастройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Если параметрыrabbitmq_num_consumers и/или rabbitmq_num_queues заданы вместе с параметром rabbitmq_exchange_type:
- плагин rabbitmq-consistent-hash-exchangeдолжен быть включен.
- свойство message_idдолжно быть определено (уникальное для каждого сообщения/пакета).
При запросах INSERT отправляемым сообщениям добавляются метаданные: messageID и флаг republished - доступны через заголовки сообщений (headers).
Для запросов чтения и вставки не должна использоваться одна и та же таблица.
Пример:
Virtual Columns
- _exchange_name- имя точки обмена RabbitMQ.
- _channel_id- идентификатор канала- ChannelID, на котором было получено сообщение.
- _delivery_tag- значение- DeliveryTagполученного сообщения. Уникально в рамках одного канала.
- _redelivered- флаг- redelivered. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- _message_id- значение поля- messageIDполученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.
- _timestamp- значение поля- timestampполученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.