返回介绍

PART Ⅰ : 容器云OPENSHIFT

PART Ⅱ:容器云 KUBERNETES

PART Ⅲ:持续集成与持续部署

PART Ⅴ:日志/监控/告警

PART Ⅵ:基础

PART Ⅶ:数据存储、处理

PART VIII:CODE

PART X:HACKINTOSH

PART XI:安全

Pulsar 的 WebSocket API

发布于 2024-06-08 21:16:45 字数 12632 浏览 0 评论 0 收藏 0

Pulsar WebSocket API 提供了一种使用没有官方客户端库的语言与 Pulsar 交互的简单方法。通过 WebSocket,您可以发布和使用消息并使用客户端功能矩阵页面上提供的功能。

官方文档:https://pulsar.apache.org/docs/client-libraries-websocket/

1、在Broker的8080端口开起

conf/broker.conf

webSocketServiceEnabled=true

2、作为独立组件启用

conf/websocket.conf

configurationMetadataStoreUrl=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

# 如果要开起TLS,需要设置一下参数
tlsEnabled=true
tlsAllowInsecureConnection=false
tlsCertificateFilePath=/path/to/client-websocket.cert.pem
tlsKeyFilePath=/path/to/client-websocket.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem
bin/pulsar-daemon start websocket

1、生产者Endpoint

请求URL

ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic?参数1=值&参数2=值
参数类型必须描述
sendTimeoutMillislongnoSend timeout (默认: 30s)
batchingEnabledbooleannoEnable batching of messages (默认: false)
batchingMaxMessagesintno批处理中允许的最大消息数 (默认: 1000)
maxPendingMessagesintnoSet the max size of the internal-queue holding the messages (默认: 1000)
batchingMaxPublishDelaylongnoTime period within which the messages will be batched (默认: 10ms)
messageRoutingModestringnoMessage routing mode for the partitioned producer: SinglePartition, RoundRobinPartition
compressionTypestringnoCompression type: LZ4, ZLIB
producerNamestringnoSpecify the name for the producer. Pulsar will enforce only one producer with same name can be publishing on a topic
initialSequenceIdlongnoSet the baseline for the sequence ids for messages published by the producer.
hashingSchemestringnoHashing function to use when publishing on a partitioned topic: JavaStringHash, Murmur3_32Hash
tokenstringnoAuthentication token, this is used for the browser javascript client

消息体样本

{
  "payload": "SGVsbG8gV29ybGQ=",
  "properties": {"key1": "value1", "key2": "value2"},
  "context": "1"
}

消息体属性

属性类型必须描述
payloadstringyesBase-64 encoded payload
propertieskey-value pairsnoApplication-defined properties
contextstringnoApplication-defined request identifier
keystringnoFor partitioned topics, decides which partition to use
replicationClustersarraynoRestrict replication to this list of clusters, specified by name

2、消费者Endpoint

请求URL

ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription?参数1=值&参数2=值
KeyTypeRequired?Explanation
ackTimeoutMillislongno设置未确认消息的超时时间 (默认: 0)
subscriptionTypestringno订阅类型: Exclusive, Failover, Shared, Key_Shared
receiverQueueSizeintno设置消费者接收队列的大小 (默认: 1000)
consumerNamestringno消费者名字
priorityLevelintno设置消费者定义优先级
maxRedeliverCountintnoDefine a maxRedeliverCount for the consumer (默认: 0). Activates Dead Letter Topic feature.
deadLetterTopicstringnoDefine a deadLetterTopic for the consumer (默认: {topic}-{subscription}-DLQ). Activates Dead Letter Topic feature.
pullModebooleanno是否开起pull模式 (默认: false)
negativeAckRedeliveryDelayintnoWhen a message is negatively acknowledged, the delay time before the message is redelivered (in milliseconds). 默认: 60000.
tokenstringnoAuthentication token, this is used for the browser javascript client

消息体样本


{
  "messageId": "CAMQADAA",
  "payload": "hvXcJvHW7kOSrUn17P2q71RA5SdiXwZBqw==",
  "properties": {},
  "publishTime": "2021-10-29T16:01:38.967-07:00",
  "redeliveryCount": 0,
  "encryptionContext": {
    "keys": {
      "client-rsa.pem": {
        "keyValue": "jEuwS+PeUzmCo7IfLNxqoj4h7txbLjCQjkwpaw5AWJfZ2xoIdMkOuWDkOsqgFmWwxiecakS6GOZHs94x3sxzKHQx9Oe1jpwBg2e7L4fd26pp+WmAiLm/ArZJo6JotTeFSvKO3u/yQtGTZojDDQxiqFOQ1ZbMdtMZA8DpSMuq+Zx7PqLo43UdW1+krjQfE5WD+y+qE3LJQfwyVDnXxoRtqWLpVsAROlN2LxaMbaftv5HckoejJoB4xpf/dPOUqhnRstwQHf6klKT5iNhjsY4usACt78uILT0pEPd14h8wEBidBz/vAlC/zVMEqiDVzgNS7dqEYS4iHbf7cnWVCn3Hxw==",
        "metadata": {}
      }
    },
    "param": "Tfu1PxVm6S9D3+Hk",
    "compressionType": "NONE",
    "uncompressedMessageSize": 0,
    "batchSize": {
      "empty": false,
      "present": true
    }
  }
}

消息体属性

  • 基础属性

    | Key | Type | Required? | Explanation | | ------------------------- | ----------------- | --------- | ------------------------------------------------------------ | | messageId | string | yes | 消息 ID | | payload | string | yes | Base-64 encoded payload | | publishTime | string | yes | 推送时间戳 | | redeliveryCount | number | yes | Number of times this message was already delivered | | properties | key-value pairs | no | Application-defined properties | | key | string | no | Original routing key set by producer | | encryptionContext | EncryptionContext | no | Encryption context that consumers can use to decrypt received messages | | param | string | no | Initialization vector for cipher (Base64 encoding) | | batchSize | string | no | Number of entries in a message (if it is a batch message) | | uncompressedMessageSize | string | no | Message size before compression | | compressionType | string | no | Algorithm used to compress the message payload |

  • encryptionContext related parameter

    | Key | Type | Required? | Explanation | | ------ | ----------------------- | --------- | ------------------------------------------------------------ | | keys | key-EncryptionKey pairs | yes | Key in key-EncryptionKey pairs is an encryption key name. Value in key-EncryptionKey pairs is an encryption key object. |

  • encryptionKey related parameters

    | Key | Type | Required? | Explanation | | ---------- | --------------- | --------- | -------------------------------- | | keyValue | string | yes | Encryption key (Base64 encoding) | | metadata | key-value pairs | no | Application-defined metadata |

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文