Exploring Messaging Systems with Argo Events

David Farr
12 min readDec 20, 2022

--

Argo

Argo Events is an event-driven workflow automation framework for Kubernetes. It is part of the CNCF and nicely augments Argo Workflows. Check out the GitHub repository for more details.

I consider myself very fortunate to work on the Data Platform team at Intuit. One of the perks of working at Intuit is getting to work closely with the Argo team; the stewards of Argo Workflows, Argo Events, and other highly successful initiatives that together constitute the Argo umbrella of projects — some of the fastest growing CNCF projects.

My primary motivation for writing this post to engage with you — the users of Argo Events and the curious technology enthusiasts. We are always looking for feedback, if the ideas presented here resonate with you please leave a comment or reach out to us on Slack or GitHub.

High Level Argo Events Architecture

My team leverages Argo Events to trigger Argo Workflows when events occur, primarily when other workflows complete. At its core, Argo Events consists of three components: EventSources, EventBuses, and Sensors.

Argo Events architecture from 10,000 feet.

EventSource

An EventSource produces proxy events when events in external systems occur. Supported sources include s3, cron, kafka, and many more. When an event occurs, the EventSource produces a proxy as a cloudevent to an EventBus. Each event specifies the event source name as the source and the event name as the subject of the cloudevent. Ordering of events is guaranteed for each (source, subject) pair. That is, messages pertaining to the same event type will be processed downstream in the same order as they are initially produced by the EventSource.

EventBus

An EventBus is a messaging system that stores transitory cloudevents. Currently, Argo Events supports NATS (deprecated) and Jetstream as messaging system technologies. EventSources produce messages to an EventBus and Sensors consume these messages. This decoupling enables multiple actions to be invoked based on a single source of truth.

Sensor

Sensors subscribe to the EventBus and trigger user-defined actions when a trigger condition occurs. Sensors (like EventSources) manifest as Kubernetes deployments that, due to the way they are implemented, cannot be horizontally scaled. If multiple replicas are specified a leader instance processes all events while the other instances are placed on standby. Argo Events uses graft to implement the leader election.

Motivation and Goals

The Data Platform team at Intuit leverages many Sensors to wire up interdependent Argo Workflows. Because each Sensor results in a Kubernetes deployment this has resulted in an unmanageable number of deployments (and transitively, pods) in our cluster. If Sensors could be horizontally scaled this would solve our problem.

Therefore, the goal of our exploration is as follows.

  1. Determine if the current EventBus technology (Jetstream) can be leveraged in a manner to enable horizontal scaling; and if not
  2. Determine alternative EventBus technologies that could be used to achieve horizontal scaling.

In addition to these goals, any proposed solution should ensure the Sensor application satisfies the following properties.

  1. Resiliency; when consumers fail, information should not be lost.
  2. High availability; when consumers fail, information should be automatically re-routed to alternate consumers.
  3. Per event type ordering; events with the same event type should be processed by the Sensor in the same order as they are produced by the EventSource.
  4. Uncoordinated consumers; consumers should not need to share information.

Analysis

Before diving into the technology portion of the analysis, we first need to go over Sensors in more detail. At the highest level, a Sensor comprises of dependencies and triggers. Dependencies are the events that triggers depend on, they can be combined together in a trigger condition. Triggers define what actions to take when a trigger condition is satisfied. Argo Events supports multiple actions out-of-the-box including slack, kafka, webhooks, and many more.

Users define their Sensors in namespace-scoped Kubernetes objects. For the purpose of our analysis let’s consider the following Sensor.

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: example-1
spec:
dependencies:
- name: blue
- name: yellow
- name: red
triggers:
- name: trigger-1
conditions: blue
- name: trigger-2
conditions: blue && yellow
- name: trigger-3
conditions: blue && yellow && red

Some required fields have been removed from this Sensor definition for the purpose of clarity. Here, we have a Sensor with three dependencies and three triggers. The first trigger depends solely on the blue event, the second trigger depends on the blue and yellow event, and the third trigger depends on the blue, yellow, and red event.

When a Sensor starts consuming, a handler is invoked one event at a time for each trigger. The state of the trigger is updated to account for the new event and then the trigger condition is evaluated. If the condition is unsatisfied, no further steps are taken. If the condition is satisfied, an action defined on the trigger (omitted in the specification above) is invoked and the state of the trigger is reset.

Jetstream

In the current design, events are consumed by the Sensor from a Jetstream stream. For each trigger, a goroutine is created that maintains independent subscriptions on the stream, one for each dependency the trigger requires. We can visualize the example Sensor above as follows.

Each trigger maintains subscriptions on the stream, one for each required dependency. Trigger processes are always part of the same application instance.

If we want to scale this approach up to multiple application instances, we would need to use a Jetstream queue group. A queue group enables a number of subscribers to receive messages in a round-robin fashion, ensuring messages are processed once by a single instance of the application. This approach has two issues.

  1. Application instances would need to share information regarding event occurrences, violating the coordination principle.
  2. Jetstream queue groups do not guarantee ordered processing, violating the ordering guarantees we wish to achieve.

Alternatively, we could scale up our Sensor by divvying up the triggers across instances. However, this approach would still violate our design goals as it would require the instances to coordinate which are responsible for which subset of triggers.

Kafka

To achieve our application goals, let’s consider an alternative EventBus technology — Kafka. Kafka has two features we can leverage in our application design.

  1. Per-partition message ordering
  2. Transactions
Each application instance (here we have two) are responsible for a subset of the triggers. The maximum number of instances is equal to the number of triggers defined in the Sensor.

This design utilizes three Kafka topics — the event topic, which can be used by multiple Sensors, and the trigger and action topics, which are specific to a Sensor. Events are consumed from the event topic by a fan out process. These messages are keyed by the (source, subject) pair known to the EventSource, but for our purposes we can imagine these messages have simple keys of {blue, yellow, red}. The fan out process is responsible for publishing messages to the trigger topic, keyed by the trigger name. We can do the fan out in a transaction to ensure exactly once delivery to the trigger topic. The trigger handlers are responsible for publishing a message to the action topic if the trigger condition is satisfied. Crucially, the trigger handlers do not bump the subscription’s trigger topic offset until the condition is satisfied to ensure application resiliency — if the application fails, messages are re-consumed from the trigger topic.

Why not fan out messages in the EventSource?
EventSources decouple events from actions. Multiple Sensors can subscribe to events produced by an EventSource which have no knowledge about the Sensors (nor their triggers) that consume from them.

The action topic is introduced to decouple trigger processing from action invocation. When the trigger handler publishes a message to the action topic it becomes free to process new messages. Actions will still be invoked in the correct order thanks to Kafka’s partition ordering guarantee.

This design would allow the Sensor application to scale up to a maximum number of instances equal to the number of triggers in the Sensor definition (assuming the trigger topic has this many partitions). When the scale is less than the number of triggers, at least one instance would be responsible for handling two triggers. The assignment of instances to triggers can be achieved without the instances coordinating amongst themselves as each will be assigned a subset of partitions, and therefore triggers, by Kafka as messages in the trigger topic are keyed by the trigger name.

One issue with this design arises when the number of partitions is less than the number of triggers. To illustrate this, consider a slightly different Sensor definition with disjoint triggers, that is, triggers that do not depend on the same dependencies.

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: example-2
spec:
dependencies:
- name: blue
- name: yellow
- name: red
triggers:
- name: trigger-1
conditions: blue && yellow
- name: trigger-2
conditions: red

Now, imagine our trigger topic has only a single partition and we consume the following events.

{blue, red}

Our fan out process will publish two messages to the trigger topic and both messages will, necessarily, land on the same partition.

{payload: blue, key: trigger-1, partition: 0, offset: 1}
{payload: red, key: trigger-2, partition: 0, offset: 2}

First, a blue event is consumed by the trigger-1 handler. No action will be taken as the trigger condition is not yet satisfied as we’re still waiting for a yellow event.

Next, a red message is consumed by the trigger-2 handler. Unlike the previous trigger, the condition is satisfied and we need to publish a message to the action topic. But should we bump the offset? Let’s walk through two possibilities and consider the behaviour of our application under failure.

Scenario 1: bump the offset for partition 0 to index 2.

In the first scenario if our application fails and is restarted, we begin consuming messages at index 3. The state of the trigger-1 handler is reset, but we have effectively lost the knowledge that the blue event occurred. Our application would not be resilient, violating the first principle our application design.

Scenario 2: maintain the offset for partition 0 at index 0.

In the second scenario if our application fails and is restarted, we begin consuming messages at index 1. We re-consume the blue event and repopulate the state of the trigger-1 handler, so far so good. But then we re-consume the red event and the trigger-2 handler will publish a duplicate message to the action topic, not so good.

The following code outlines possible implementations for both scenarios using the Sarama go client for Kafka. For sake of simplicity, it is assumed there exists an implementation of the Handler interface that implements the methods described below and that the Sensor struct is instantiated with a handler for each trigger.

type Sensor struct {
name string
consumer sarama.ConsumerGroup
producer sarama.AsyncProducer

// Instantiated with a struct conforming to the Handler interface
// below for each trigger defined in the Sensor specification.
handlers []*Handler
}

type Handler interface {
// Returns the name of the trigger defined in the Sensor
// specification.
Name() string

// Updates the state of the handler with the given message.
Update(msg *sarama.ConsumerMessage)

// Returns true if the trigger condition is satsfied, otherwise
// false.
Satisfied() bool

// Returns the smallest offset of all messages the handler has
// stored in memory for the given partition. If the handler has no
// messages for the partition, returns nil.
Offset(partition int) *int

// Returns a message if the trigger condition is satisfied,
// otherwise returns nil.
Action() *sarama.ProducerMessage

// Resets the state of the handler.
Reset()
}

func (s *Sensor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg := <-claim.Messages():
// Begin a transaction.
s.producer.BeginTxn()

// TODO: initialize the offset for the transaction.
offset := 0 // scenario 1
offset := msg.Offset + 1 // scenario 2

for _, handler := range s.handlers {
// Update the trigger handler if the message consumed is
// intended for this trigger.
if handler.Name() == string(msg.Key) {
handler.Update(msg)
}

if handler.Satisfied() {
// If the trigger condition is satified, publish
// a message to the action topic.
d.producer.Input() <-handler.Action()

// Since the trigger is satisfied, reset the handler.
handler.Reset()
}

// TODO: set offset to either the smallest or largest
// handler offset. Both approaches have their own pros
// and cons.
handlerOffset := handler.Offset(msg.Partition)

if handlerOffset != nil && *handlerOffset > offset {
offset = *handlerOffset // scenario 1
}
if handlerOffset != nil && *handlerOffset < offset {
offset = *handlerOffset // scenario 2
}
}

offsets := map[string][]*sarama.PartitionOffsetMetadata{
msg.Topic: {{
Partition: msg.Partition,
Offset: offset,
Metadata: nil,
}},
}

// Bump the offset as part of the transaction.
s.producer.AddOffsetsToTxn(offsets, s.name)

// Commit the transaction. If the application fails anywhere
// between the start and end of the transaction, all messages
// will be redelivered to a new consumer.
s.producer.CommitTxn()
}
}

return nil
}

While neither scenario is optimal, we should prefer the second option as it does not violate our goal of application resiliency. Furthermore, there are techniques we can implement to ensure idempotency downstream in the action handler.

Pulsar

Enter Pulsar. Pulsar has many of the same features as Kafka such as per-partition ordering and transactions, but crucially also supports per-message acknowledgments. With Pulsar, we can retain the same design we explored with Kafka while also ensuring exactly once delivery to the action topic. Let’s re-examine the Sensor above, but this time with Pulsar message acknowledgments as opposed to a Kafka subscription offset.

Scenario 3: acknowledge index 2, but not index 1.

In this scenario if our application fails and is restarted, we will re-consume the blue message, thus repopulating the state of the trigger-1 handler. But, we will not re-consume the red message and therefore not publish a duplicate message to the action topic.

The following code outlines a possible Pulsar-based implementation. Like before, it is assumed there exists an implementation of the Handler interface and that the Sensor struct is instantiated with a handler for each trigger.

type Sensor struct {
consumer pulsar.Consumer
producer pulsar.Producer

// Instantiated with a struct conforming to the Handler interface
// below for each trigger defined in the Sensor.
handlers map[string]*Handler
}

type Handler interface {
// Returns the name of the trigger defined in the Sensor
// specification.
Name() string

// Updates the state of the handler with the given message.
Update(msg pulsar.Message)

// Returns true if the trigger condition is satsfied, otherwise
// false.
Satisfied() bool

// Returns the message ids of all processed messages that are
// no longer required and can be acked.
Ack() []pulsar.MessageID

// Returns a message if the trigger condition is satisfied,
// otherwise returns nil.
Action() *pulsar.ProducerMessageq

// Resets the state of the handler.
Reset()
}

func (s *Sensor) consume(ctx context.Context) {
for {
msg, _ := s.consumer.Receive(ctx)

if handler, ok := s.handlers[msg.Key()]; ok {
// Begin a transaction. The transaction feature is not yet
// available for the pulsar go client so here we are imagining
// what the api might look like.
txn := s.producer.BeginTxn()

// Update the trigger handler state.
handler.Update(msg)

if handler.Satisfied() {
// If the trigger condition is satified, publish a
// message to the action topic.
txn.Send(ctx, handler.Action())

// Reset the handler.
handler.Reset()
}

// Ack all messages that are no longer required.
for _, id := range handler.Ack() {
txn.AckID(id)
}

// Commit the transaction.
txn.Commit()
}
}
}

Proof of Concept

I have put together a proof of concept for both a Kafka and Pulsar based Argo Events EventBus called kafkanaut (the name predates the idea of exploring Pulsar). Please check it out if you want to see the complete versions of the snippets above, but beware, the code is far from production ready.

While the concepts discussed here are demonstrated in the codebase, the Pulsar implementation is missing the following features.

  1. Transactions. Both the fan out and trigger handlers do not implement transactions, which can lead to inconsistencies. Transactions are not yet supported in the go client library but is a planned feature.
  2. Consumer event listener in the trigger handlers. If a (topic, partition) pair is unassigned from a consumer, the consumer is not notified. This can lead to inconsistencies when the application is scaled up. This feature is now available in the go client library (thanks Pulsar team) and can be implemented in our application.

Next Steps

While Pulsar would certainly make our application simpler to implement, it’s not all smooth sailing. Our application would require one feature in the Pulsar go client library (detailed above) which is not yet available.

Another angle I want to explore is enabling action handler idempotency in the Kafka EventBus without introducing additional state. I have heard that offset metadata may be able to be leveraged to achieve this, but I need to explore this option further.

But most of all we want to hear from you, the Argo Events community, the everyday users of EventSources, EventBuses, and Sensors. Is lack of horizontal scaling in Argo Events something you care about? If so, does the messaging system technology choice matter to you? Or do you just want everything to work out-of-the-box?

Please leave a comment, or get in contact with us on Slack or GitHub. We would love to hear from you.

--

--