Introduction to programming with event streams
Author: Marko Beelman, Philips Medical Systems Böblingen
Contribution – Embedded Software Engineering Congress 2015
The increasing networking of devices and the use of cloud services present new challenges for software development. Sensor readings and push notifications from the cloud – more and more events need to be processed. Reactive Extensions allow events to be converted into streams and easily coordinated. Schedulers also significantly simplify asynchronous processing.
Anyone who owns a smartphone appreciates the many functions these devices offer. Whether it's navigation, fitness tracking, chat, a spirit level, or top news delivered directly to the smartphone – smartphones handle all these functions with varying degrees of success. For software development, however, this also means reacting to a multitude of events, be it GPS coordinates, sensor data, or push notifications from the cloud. Software must be able to process this flood of events intelligently and transparently for the user. On smartphones, the user interface should not become overloaded due to this deluge of events.
It's not just smartphones that are facing this "flood of events." The increasing connectivity of the devices around us will also lead to a large number of events that, in turn, need to be processed by software. The ever-expanding cloud services also contribute to a continuous flow of events. The classic approach to reacting to these asynchronous events is callbacks, which are triggered when an event arrives. However, this approach can become very unwieldy and inefficient with a large number of events and, above all, with many different event sources. Event sources with a high event frequency can quickly cause performance problems for software.
To prevent such problems, Reactive Extensions are a good option. These software libraries originally come from Microsoft and were released as open source in 2012. Since then, the number of supported programming languages has been steadily increasing. For almost every widely used language, there is an extension that supports the concept of "reactive programming." But what exactly is it all about? Essentially, it's about abstracting events or other data sources as a stream of events.
These event streams are represented by a so-called Observerable. These Observables are fundamentally similar to the Gang of Four's Observer pattern. A developer can register with the Observable via a "subscription." For successful registration, the caller must provide an Observer. This Observer contains up to three callbacks that react to the event stream. A callback is responsible for the actual event processing. When an event occurs within the Observerable, the callback is invoked. Typically, the Observable then calls the Observer's OnNext() method. In addition to the OnNext() method, the Observer has two other callbacks: OnError() and OnCompleted(), which return the status of the Observable. OnError() is called when an error occurs in the event stream. This allows the developer to explicitly react to errors in an event stream. When OnCompleted() is called, no further events will originate from that Observable. The event stream is closed.
Notifications about errors in the event stream can be particularly useful, for example, to allow users to re-register directly to a new stream or to inform them about problems with an event source. Calling the `OnCompleted` event can also be useful, for example, to close the network connection of an event stream.
The library truly shines when it comes to composing and filtering such event streams. A large number of operators and methods exist to tailor the stream to specific needs. The actual event stream, the true source of the events, remains unchanged. For example, an event stream with a high event frequency can be mitigated using appropriate operators. If a sensor delivers its value every 10ms, the `Sample()` operator can create a new event stream that only reports the current value every 100ms. Alternatively, an event stream that calculates the average value with each new value could be useful. In this case, it makes sense to use the already mitigated event stream as a basis. This creates a second event stream whose event is the average of all previously occurring event values. (See Fig. 1) PDFThis illustrates the progression of events in all three event streams. This type of diagram is called a marble diagram.
The Reactive Extensions include a large number of methods that can derive further streams from a single stream.
Another strength of the library lies in its ability to combine event streams. Besides creating new streams, it's also possible to combine existing streams. This can be particularly useful when consolidating multiple event streams into one. The `Merge()` method inserts a second stream into an existing event stream. A typical example is event streams from sensors processed by a dedicated thread. In this case, the streams from the individual sensors are merged into a new, central stream, which is then processed by a dedicated thread.
In addition, there are other useful methods that combine event streams. The `Zip()` method generates a new stream from two streams, which only triggers an event when both underlying streams trigger an event.
Besides the ability to model "real" events, the Observable/Observer model offers another very interesting aspect. Using Observables, it's possible to map arrays/collections over time. "Over time" in this context means that the data is generally made available to the consumer asynchronously.
A comparison between the classic Iterator pattern and an Observable should clarify the difference. The Iterator can be queried from a provider (aggregate) and allows for sequential data retrieval. Methods such as MoveNext() are used to jump to the next sequential element. This is where a problem can arise, because MoveNext() would block until the next element is ready. This can lead to long wait times, especially with I/O-based resources like an internet connection. In large applications or services, this can become problematic, as many threads simply consume resources unnecessarily with blocking methods. These resources could be used more effectively elsewhere.
From the consumer's perspective, the data must be actively retrieved from the source. Therefore, this model is also known as a pull model.
Wouldn't it be simpler if the consumer received the requested data as soon as it was available? This is precisely where the push model comes into play. Besides delivering the data, the consumer is also notified when no data is available. If the source reports an error or exception, this is also reported. This push model is represented by Observables. With this concept, the consumer should no longer be stuck waiting for the next element in a sequence. The use of schedulers also simplifies the coordination of concurrent operations. Many operators in the Reactive Extensions enable easy and convenient processing.
Since such data sources ultimately only represent event streams, they can be combined and connected with other streams as desired.
This provides the developer with a tool that allows them to easily and efficiently process, combine and (asynchronously) coordinate event streams.
This concept is therefore particularly suitable for software that, for example, works with a large number of events or is designed that way, keyword "Event-Based Components". But this library also proves its worth in the (asynchronous) processing of large amounts of data.
Sources
- Introduction to RX – also available as a free eBook
- Overview of a variety of programming languages
- The Reactive Manifesto
- The Cloud Programmability Group at Microsoft
Open Source – our training & coaching
Do you want to bring yourself up to date with the latest technology?
Then find out more here MircoConsult offers training courses/seminars/workshops and individual coaching on the topic of Open Source / Embedded Software Engineering.
Training & coaching on the other topics in our portfolio can be found here. here.
Open Source – Expertise
Valuable expertise in modeling/embedded and real-time software development is available. here Available for you to download free of charge.
You can find expertise on other topics in our portfolio here. here.
