SQR-034: EFD Operations

  • Angelo Fausti

Latest Revision: 2019-10-11

Note

This technote is not yet published.

Describe the EFD data flow, retention policies, etc

1   Introduction

DMTN-082 [3] presents a proposal to enable real-time analysis of the EFD data in the LSST Science Platform (LSP). SQR-029 [1] describes a prototype implementation of the EFD based on the Confluent Kafka Platform and the InfluxData stack and reports results of live tests with the LSST T&S Service Abstraction Layer (SAL) including latency characterization and performance evaluation with high-frequency telemetry. SQR-031 [2] describes the EFD Kubernetes-based deployment using Kubes (k3s), a lightweight Kubernetes, that allowed us to exercise the deployment and operation of the EFD at the Auxiliary Telescope (AT) test stand in Tucson and at the Summit in Chile while we implement the final on-premise deployment platform.

In this technote, we describe new components added to the EFD architecture, data replication from the Summit to the LSST Data Facility (LDF), data storage options for long-term analyses of the EFD data in the LSP, and the EFD data retention policy.

_images/efd_data_flow.png

Figure 1 EFD components at the Summit and LDF.

2   The SAL Kafka producer

The SAL Kafka producer runs at the Summit and forward DDS messages from one or more SAL components to the Kafka brokers. For each DDS topic, SAL Kafka introspects the OpenSplice IDL, creates the Avro schema and upload it to the Kafka Schema registry dynamically. The Kafka brokers cache the Avro serialized messages, and consumers use the Avro schemas created by SAL Kafka to deserialize the messages and write them to the different destinations.

SAL Kafka was an important addition to the EFD architecture, it decoupled the EFD from the SAL XML schemas and introduced Avro as the interface between the DDS middleware and Kafka.

3   The Kafka Connect manager

Another addition to the EFD architecture was the Kafka Connect manager. The Kafka Connect manager is the component responsible for managing the Kafka Connect REST interface. It is used to deploy the different connectors in the EFD and can automatically update the connector configuration when new topics are created in the Kafka broker.

4   Data replication and fault tolerance

The EFD uses Kafka to replicate data from the Summit (primary site) to the LDF (secondary site). The Confluent Replicator connector is the component responsible for that. In the EFD setup, the Replicator connector runs in one direction copying data from the primary site to the secondary site.

New topics in the Kafka brokers at the Summit are automatically detected and replicated to the Kafka brokers at the LDF. As throughput increases, the Replicator automatically scales to accommodate the increased load. The Replicator also synchronizes the Schema registry across the two sites, which further protects the EFD against data loss.

Note that in the present setup we have an InfluxDB consumer that reads data from the Kafka cluster at the primary site, and the Aggregator, InfluxDB, Oracle and Parquet consumers that read data from the Kafka cluster at the secondary site. Within the Kafka cluster we have fault tolerance by replicating the Kafka topics across the brokers. That’s done by SAL Kafka producer creating topics with a replication factor of 3 by default. If one of the InfluxDB instances die, we still can connect to the other InfluxDB instance for real-time analysis of the EFD data. However, note that there’s no failover mechanism for a consumer in the primary (secondary) site to read data from the secondary (primary) site. Similarly, the SAL Kafka producer writes data to just the primary site.

In summary, the data replication enables long-term storage of the EFD data at the LDF and also provides a live backup of the EFD data in the Summit instance (see 5   Downsampling and data retention).

5   Downsampling and data retention

The EFD writes thousands of topics with frequencies ranging from 1Hz to 100Hz. Querying the raw EFD data on large time windows can be quite painful, especially at the Summit with limited computing resources.

A natural solution is to downsample the raw data and store one or two versions of low-resolution data for extended periods. In InfluxDB, it is possible to configure multiple retention policies. For instance, at the Summit we can have 1 week of raw data, 1 month of an intermediate resolution version, and 1 year of a low resolution version. The retention policy is such that data older than the retention period is automatically deleted. The result is a moving time window on the most recent data in each case. Downsampling is efficiently done inside InfluxDB using Flux tasks that can be scheduled during daytime. Similar retention policies at the LDF can be configure so that we can query the data efficiently over extended periods in InlfuxDB.

Real-time analysis of the EFD data might include statistical models for anomaly detection and forecasting. For example, InfluxDB implements a built-in multiplicative Holt-Winter’s function to generate predictions on time series data. At the Summit, if we store 1 month of raw EFD data, that’s roughly 1% of the data collected over the 10-years survey. If that’s sufficient to build a statistical model or not depends on the long term trends and seasonality of the time-series we are analyzing. An interesting possibility of the present EFD architecture is to build the statistical models at the LDF where we have the raw data stored for longer periods and apply the models at the Summit when configuring alerts.

6   The Aggregator

As proposed in DMTN-082 [3], the LSP users are generally interested in telemetry data at a frequency closer to the cadence of the observations. It proposes that “all telemetry topics sampled with a frequency higher than 1Hz are (1) downsampled at 1Hz and (2) aggregated to 1Hz using general statistics like min, max, mean, median stdev”. Commands and event topics should not be aggregated as they are typically low-frequency and can be read directly from the raw EFD data sources.

In addition, the aggregator should resample the telemetry topics in a regular time grid to make it easier to correlate them.

The aggregator stream-processor produces a new set of aggregated telemetry topics in Kafka that can be consumed and stored in Parquet, Oracle and InfluxDB. That gives the user multiple options to combine the aggregated telemetry with the exposure table which resides primarily in the Oracle database:

  • inside the LSP notebook environment using Pandas dataframes after querying the exposure table and reading the telemetry data from one of the sources above;
  • inside the Oracle database joining the exposure and the telemetry tables using SQL;
  • Inside InfluxDB using Flux sql.from() function to retrieve data from the exposure table.

All these “joins” are based on timestamps.

An interesting option for implementing the Aggregator is Faust, a Python asyncio stream processing library. Faust supports Avro serialization and multiple instances of a Faust worker can be started independently to distribute stream processing across nodes or CPU cores.

7   Options for long-term storage at the LDF

The LSP benefits from accessing data stored in Parquet format, which is compatible with Dask used to scale computations across multiple machines. The Confluent Kafka connect storage-cloud connector recently added support to Parquet on S3. From the connector configuration, it is also possible to partition data based on time. We might want to store both the raw EFD data and the aggregated EFD data in Parquet files, which also serves as a cold backup of the EFD data.

We plan on storing the aggregated EFD data in Oracle, which is convenient to make joins with the exposure table as discussed in the 6   The Aggregator session. The Kafka Connect JDBC Connector supports Oracle databases through the JDBC driver for Oracle. The JDBC Sink connector automatically creates the destination tables if the auto.create configuration option is enabled, and can also perform limited auto-evolution on the destination tables if the auto.evolve configuration option is enabled. An alternative, is to load data to the Oracle database from Parquet files in batch, but then we lose the convenience of creating and evolving the database schema offered by JDBC Sink connector.

For the InfluxDB at the LDF, we can store the raw data for more extended periods than in the Summit. We might consider the InfluxDB enterprise to build an InfluxDB cluster or pay for InfluxDB Cloud. Alternatively, we can have multiple retention policies in InfluxDB and store low-resolution versions of the data for extended periods as discussed in the 5   Downsampling and data retention session.

8   Monitoring

For monitoring the Kafka cluster, we use Prometheus deployed with the Confluent Kafka Helm charts, and eventually, the Confluent Kafka Control Center. For InfluxDB, we collect system metrics from a different number of Telegraf plugins. We intend to ingest the EFD logs in the logging infrastructure at Summit and the LDF as well.

9   References

[1][SQR-029]. Frossie Economou. DM-EFD prototype implementation. 2019. URL: https://sqr-029.lsst.io
[2][SQR-031]. Angelo Fausti. EFD deployment. 2019. URL: https://sqr-031.lsst.io
[3](1, 2) [DMTN-082]. Simon Krughoff. On accessing EFD data in the Science Platform. 2018. URL: https://dmtn-082.lsst.io