JetStream

Detailed documentation on the NATS JetStream component

Component format

To set up JetStream pub/sub, create a component of type pubsub.jetstream. See the pub/sub broker component file to learn how ConsumerID is automatically generated. Read the How-to: Publish and Subscribe guide on how to create and apply a pub/sub configuration.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: jetstream-pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: jwt # Optional. Used for decentralized JWT authentication.
    value: "eyJhbGciOiJ...6yJV_adQssw5c"
  - name: seedKey # Optional. Used for decentralized JWT authentication.
    value: "SUACS34K232O...5Z3POU7BNIL4Y"
  - name: tls_client_cert # Optional. Used for TLS Client authentication.
    value: "/path/to/tls.crt"
  - name: tls_client_key # Optional. Used for TLS Client authentication.
    value: "/path/to/tls.key"
  - name: token # Optional. Used for token based authentication.
    value: "my-token"
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"
  - name: startSequence
    value: 1
  - name: startTime # In Unix format
    value: 1630349391
  - name: flowControl
    value: false
  - name: ackWait
    value: 10s
  - name: maxDeliver
    value: 5
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000
  - name: replicas
    value: 1
  - name: memoryStorage
    value: false
  - name: rateLimit
    value: 1024
  - name: heartbeat
    value: 15s
  - name: ackPolicy
    value: explicit
  - name: deliverPolicy
    value: all
  - name: domain
    value: hub
  - name: apiPrefix
    value: PREFIX

Spec metadata fields

Field Required Details Example
natsURL Y NATS server address URL "nats://localhost:4222"
jwt N NATS decentralized authentication JWT "eyJhbGciOiJ...6yJV_adQssw5c"
seedKey N NATS decentralized authentication seed key "SUACS34K232O...5Z3POU7BNIL4Y"
tls_client_cert N NATS TLS Client Authentication Certificate "/path/to/tls.crt"
tls_client_key N NATS TLS Client Authentication Key "/path/to/tls.key"
token N NATS token based authentication "my-token"
name N NATS connection name "my-conn-name"
streamName N Name of the JetStream Stream to bind to "my-stream"
durableName N Durable name "my-durable"
queueGroupName N Queue group name "my-queue"
startSequence N Start Sequence 1
startTime N Start Time in Unix format 1630349391
flowControl N Flow Control true
ackWait N Ack Wait 10s
maxDeliver N Max Deliver 15
backOff N BackOff "50ms, 1s, 5s, 10s"
maxAckPending N Max Ack Pending 5000
replicas N Replicas 3
memoryStorage N Memory Storage false
rateLimit N Rate Limit 1024
heartbeat N Heartbeat 10s
ackPolicy N Ack Policy explicit
deliverPolicy N One of: all, last, new, sequence, time all
domain N [JetStream Leafondes] HUB
apiPrefix N [JetStream Leafnodes] PREFIX

Create a NATS server


You can run a NATS Server with JetStream enabled locally using Docker:

docker run -d -p 4222:4222 nats:latest -js

You can then interact with the server using the client port: localhost:4222.


Install NATS JetStream on Kubernetes by using the helm:

helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install --set nats.jetstream.enabled=true my-nats nats/nats

This installs a single NATS server into the default namespace. To interact with NATS, find the service with:

kubectl get svc my-nats

For more information on helm chart settings, see the Helm chart documentation.

Create JetStream

It is essential to create a NATS JetStream for a specific subject. For example, for a NATS server running locally use:

nats -s localhost:4222 stream add myStream --subjects mySubject

Example: Competing consumers pattern

Let’s say you’d like each message to be processed by only one application or pod with the same app-id. Typically, the consumerID metadata spec helps you define competing consumers.

Since consumerID is not supported in NATS JetStream, you need to specify durableName and queueGroupName to achieve the competing consumers pattern. 例如:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: name
    value: "my-conn-name"
  - name: streamName
    value: "my-stream"
  - name: durableName 
    value: "my-durable-subscription"
  - name: queueGroupName
    value: "my-queue-group"