SQR-034: EFD Operations

  • Angelo Fausti

Latest Revision: 2020-06-30

1   Endpoints for EFD deployments

The following sections list the EFD instances available, where they run and who the intended audience is for each one.

The main entry point to the EFD is Chronograf, where users can make queries and create dashboards against the EFD.

Note

To login to Chronograf, you need to be a member of the GitHub lsst-sqre organization. If you are not, please drop a line on the #com-square LSSTC Slack channel.

The Confluent Schema Registry and the Kafka Broker are used to configure the SAL Kafka producers to send data to an EFD instance. Note that for instances where the SAL Kafka producers are deployed on Kubernetes we use the internal address for the broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092.

If you plan on accessing EFD data from a notebook, see documentation on using the EFD Client.

1.1   Summit EFD

Instance running at the Summit (Chile) on k3s (“kubes”).

Intended audience: Commissioning and Science Verification teams.

Data at the Summit EFD is also replicated to the LDF EFD to enable project wide access.

  • Chronograf: https://chronograf-summit-efd.lsst.codes
  • InfluxDB HTTP API: https://influxdb-summit-efd.lsst.codes
  • Confluent Schema Registry: https://schema-registry-summit-efd.lsst.codes
  • Kafka Broker: kafka-0-summit-efd.lsst.codes:31090
  • Kafdrop UI: https://kafdrop-summit-efd.lsst.codes

1.2   Base EFD

Instance running at the Base facility (Chile) on the Kueyen k8s cluster.

Intended audience: Commissioning and Science Verification teams.

The plan is to have replication from the summit to the base and from the base to LDF working soon.

  • Chronograf: https://chronograf-base-efd.lsst.codes
  • InfluxDB HTTP API: https://influxdb-base-efd.lsst.codes
  • Confluent Schema Registry: https://schema-registry-base-efd.lsst.codes
  • Kafka Broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092
  • Kafdrop UI: https://kafdrop-base-efd.lsst.codes

1.3   NCSA test stand EFD

Standalone instance running at the NCSA test stand.

Intended audience: Telescope and Site Team

  • Chronograf: https://lsst-chronograf-nts-efd.ncsa.illinois.edu
  • InfluxDB HTTP API: https://lsst-influxdb-nts-efd.ncsa.illinois.edu
  • Confluent Schema Registry: https://lsst-schema-registry-nts-efd.ncsa.illinois.edu
  • Kafka Broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092
  • Kafdrop UI: https://lsst-kafka-0-nts-efd.ncsa.illinois.edu

1.4   RSP Integration EFD

Instance running at NCSA on the RSP integration k8s cluster.

Note

As of March 20, this instance holds a copy of the Summit EFD data and dashboards and can be used during the shutdown of the Rubin Observatory caused by the COVID-19 outbreak.

Intended audience: Commissioning and Science Verification teams.

  • Chronograf: https://lsst-chronograf-int-efd.ncsa.illinois.edu
  • InfluxDB HTTP API: https://lsst-influxdb-int-efd.ncsa.illinois.edu
  • Confluent Schema Registry: https://lsst-schema-registry-int-efd.ncsa.illinois.edu
  • Kafka Broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092

1.5   RSP Stable EFD

Instance running at NCSA on the RSP stable k8s cluster.

Intended audience: Everyone in the project.

  • Chronograf: https://lsst-chronograf-efd.ncsa.illinois.edu
  • InfluxDB HTTP API: https://lsst-influxdb-efd.ncsa.illinois.edu
  • Confluent Schema Registry: https://lsst-schema-registry-efd.ncsa.illinois.edu
  • Kafka Broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092

1.6   Tucson test stand EFD

Standalone instance running at the Tucson test stand.

Intended audience: Telescope and Site Team.

  • Chronograf: https://chronograf-tucson-teststand-efd.lsst.codes
  • InfluxDB HTTP API: https://influxdb-tucson-teststand-efd.lsst.codes
  • Confluent Schema Registry: https://schema-registry-tucson-teststand-efd.lsst.codes
  • Kafka Broker: kafka-0-tucson-teststand-efd.lsst.codes:31090

1.7   Sandbox EFD at Google Cloud Platform

Standalone instance running at GCP for sandbox activities.

Intended audience: Any group interested in having a testing environment.

  • Chronograf: https://chronograf-sandbox-efd.lsst.codes
  • InfluxDB HTTP API: https://influxdb-sandbox-efd.lsst.codes
  • Confluent Schema Registry: https://schema-registry-sandbox-efd.lsst.codes
  • Kafka Broker: cp-helm-charts-cp-kafka-headless.cp-helm-charts:9092
  • Kafdrop UI: https://kafdrop-sandbox-efd.lsst.codes

Follow #com-square on the LSSTC Slack for updates.

2   Introduction

In DMTN-082 [3], we present a high level architecture to enable real-time analysis of the Engineering and Facilities Database (EFD) data in the Rubin Science Platform (RSP).

In SQR-029 [1], we describe the implementation of the EFD based on Kafka and InfluxDB. We report results of live tests with the LSST T&S Service Abstraction Layer (SAL) including latency characterization and performance evaluation with high-frequency telemetry.

Finally, in SQR-031 [2], we describe a Kubernetes-based deployment of the EFD using k3s (“kubes”) a lightweight Kubernetes, allowing us to use the EFD in non-production environments including single node deployments: e.g. test stand environments.

In this technote, we describe the EFD operation with 1) an instance at the Summit to store the data and to enable real-time analysis for observers, and 2) an instance at the US Data Facility (USDF) that replicates the data from the Summit and stores it for the long term. The EFD at the USDF is meant to be a centralized place where the Rubin staff can connect and perform their analysis without interfering with the Summit instance. One of the benefits of the present architecture is that it makes EFD data available at the USDF with latency under 1 second.

_images/efd_architecture.png

Figure 1 Data flow from the Summit to the USDF.

As seen from from the first section, there are more deployments than simply the Summit and the US DF for various reasons. However, these are the minimal deployments required for the EFD operation.

The main components of the EFD at the Summit are Kafka, InfluxDB, the InfluxDB Sink connector, Chronograf and Kapacitor. The SAL Kafka producers are managed by the T&S team and are currently deployed on k8s in most of the environments now.

At the USDF, we also have the replicator, the aggregator and the connectors to write data to Parquet files to an object store and to a traditional RDBMS.

Also, we describe new components added to the EFD architecture, in particular, we discuss data replication, retention policies, and options for long-term storage of the EFD data.

3   The SAL Kafka producer

The SAL Kafka producers forward DDS messages from one or more SAL components to Kafka. For each DDS topic, SAL Kafka introspects the OpenSplice IDL, creates the Avro schema and uploads it to the Kafka Schema Registry dynamically. The Kafka brokers cache the Avro serialized messages, and consumers use the Avro schemas created by SAL Kafka to deserialize them.

SAL Kafka was an important addition to the EFD architecture, it decouples the EFD from the SAL XML schemas and introduces Avro as the interface between the DDS middleware and the EFD.

4   The Kafka Connect manager

Another addition to the EFD architecture is the Kafka Connect manager. The Kafka Connect manager is the component responsible for managing the Kafka Connect REST interface. It is used to deploy the different connectors to the EFD. For connectors that are not dynamic like the InfluxDB Sink and the JDBc Sink connectors, the Kafka Connect manager can automatically update the connector configuration when new topics are created in Kafka.

5   Data replication and fault tolerance

The EFD uses Kafka to replicate data from and Summit EFD (primary site) to the USDF EFD (secondary site). The Kafka Connect Replicator source connector is the component responsible for that. In the EFD setup, the Replicator source connector runs in one direction pulling topics from the primary site to the secondary site.

New topics and schemas in the Summit EFD are automatically detected and replicated to the USDF EFD. As throughput increases, the Replicator automatically scales to accommodate the increased load. Replicating topics and schemas across primary and secondary sites further protects the EFD against data loss.

Consumers at the Summit only read data from the primary site and consumers at the LDF only read data from the secondary site, with the exception of the Replicator. Within the Kafka cluster we have fault tolerance by replicating the Kafka topics across three brokers (the default set up). That’s done by the SAL Kafka producer creating topics with a replication factor of three.

If the InfluxDB instance in one of the primary sites dies, the InfluxDB instance on the secondary site can be used to access the data. However, there’s no failover mechanism that automatically connects a consumer to the secondary site.

In summary, the USDF EFD provides long-term storage and a live backup of the EFD data (see the following section for details).

6   Downsampling and data retention

The EFD writes thousands of topics with frequencies ranging from 1Hz to 100Hz. Querying the raw EFD data on large time windows can be resource intensive, especially at the primary sites with limited computing resources.

A natural solution is to downsample the raw data and store one or two versions of low-resolution data for extended periods. In InfluxDB, it is possible to configure multiple retention policies. For instance, at the primary sites we can have 1 week of raw data, 1 month of an intermediate resolution version of the data, and 1 year of a low resolution version of the data. The retention policy is such that data older than the retention period is automatically deleted. The result is a moving time window on the most recent data in each case. Downsampling is efficiently done inside InfluxDB using Flux tasks that can be scheduled during daytime if necessary to keep for interfering with nighttime activities. Similar retention policies at the USDF can be configured so that we can query the data efficiently over extended periods.

Real-time analysis of the EFD data might include statistical models for anomaly detection and forecasting. For example, InfluxDB implements a built-in multiplicative Holt-Winter’s function to generate predictions on time series data. At the Summit, if we store 1 week of raw EFD data, that’s roughly 0.2% of the data collected over the 10-year survey. Whether that’s sufficient to build a statistical model or not depends on the long term trends and seasonality of the time-series we are analyzing. An interesting possibility of the present EFD architecture is to build the statistical models from historical data at the LDF EFD and apply the models to the Summit EFD when configuring alarms.

7   The Aggregator

As proposed in DMTN-082 [3], RSP users are expected to generally be interested in telemetry data at a frequency closer to the cadence of the observations. It proposes that “all telemetry topics sampled with a frequency higher than 1Hz are (1) downsampled at 1Hz and (2) aggregated to 1Hz using general statistics like min, max, mean, median stdev”. Commands and event topics should not be aggregated as they are typically low-frequency and can be read directly from the raw EFD data sources.

In addition, the aggregator resamples the telemetry topics in a regular time grid to make it easier to correlate them.

The aggregator stream-processor produces a new set of aggregated telemetry topics in Kafka that are consumed and stored in Parquet, InfluxDB and in an RDBMS. That gives the user multiple options to combine the aggregated telemetry with the exposure table which resides in the USDF consolidated database:

  • inside the LSP notebook environment using Pandas data-frames after querying the exposure table and reading the telemetry data from Parquet files from an object store.
  • inside the consolidated database by joining the exposure and the telemetry tables using SQL;
  • Inside InfluxDB using Flux sql.from() function to retrieve data from the exposure table.

All these “joins” are based on timestamps.

The Kafka Aggregator is implemented in Faust, a Python asyncio stream processing library. Faust supports Avro serialization and multiple instances of a Faust worker can be started independently to distribute stream processing across nodes or CPU cores.

8   Options for long-term storage at the LDF

The RSP benefits from accessing data stored in Parquet format. Parquet is compatible with Dask, a library used to scale computations across multiple worker nodes. The Confluent Amazon S3 Sink connector supports Parquet on S3. From the connector configuration, it is possible to partition data based on time. We might want to store both the raw EFD data and the aggregated EFD data in Parquet files. This would serve as a live backup of the full raw EFD data.

We plan on storing the aggregated EFD data in the USDF consolidated database, which is convenient to make joins with the exposure table as discussed in the 7   The Aggregator session. The Kafka Connect JDBC connector supports connections to all popular RDBMS implementations. The JDBC Sink connector automatically creates the destination tables if the auto.create configuration option is enabled, and can also perform limited auto-evolution on the destination tables if the auto.evolve configuration option is enabled. An alternative, is to load data to the consolidated database from Parquet files in batch. The trade off is that we lose the convenience of creating and evolving the database schema offered by the JDBC Sink connector.

We can store the raw data for more extended periods at the USDF than in the Summit simply due to the fact that storage and processing is concentrated at the USDF. We will tune multiple retention policies in InfluxDB and store low-resolution versions of the data at the Summit to allow for access to longer time windows as discussed in the 6   Downsampling and data retention session.

9   Monitoring

For monitoring the Kafka cluster, we use the Kafdrop UI and also monitor JMX metrics exposed by the Confluent Platform. JMX is a common technology in Java world for exporting application metrics. Confluent Kafka components use JMX APIs to collect application and JVM metrics and expose them over HTTP in a format that Prometheus understands and can scrape. We then use the Telegraf input Prometheus plugin to write these metrics to InfluxDB and create a Kafka monitoring dashboard in Chronograf.

For monitoring InfluxDB itself, we collect system and InfluxDB metrics using Telegraf as well.

We plan on ingesting the EFD logs into the logging infrastructure at the Summit and USDF too.

10   Appendix A - Configuring the Kafka Connect Replicator source connector

We’ve added the Kafka Connect Replicator source connector version 5.3.1 to our Kafka Connect container image and tested topic replication and schema migration.

In this setup, the topic replication works in one direction. The Replicator source connector consumes topics from the source cluster and the Kafka Connect workers produce topics to the destination cluster. Replicated topics are namespaced to indicate their origin. For example, summit.{topic} indicates that the topic is replicated from the Summit EFD, etc.

Schema migration follows the continuous migration model. The Replicator continuously copy schemas from the source cluster to the destination cluster Schema Registry, which is set to IMPORT mode. Schema translation ensures that subjects are renamed following the topic rename strategy when migrated to the destination Schema Registry.

An example of configuration for the Replicator that includes topic and schema replication with schema translation can be found here.

_images/replicator_connector.png

Note that Kafka Connect bootstrap.servers configuration must include the URL of the destination Kafka cluster and that the destination Schema Registry must be in IMPORT mode. To initialize the destination Schema Registry to IMPORT mode, first set mode.mutability=True in the configuration and make sure the destination Schema Registry is empty. See schema migration configuration with the Replicator connector for details.

Confluent’s recommendation is to deploy the Replicator source connector at the destination cluster (remote consuming). However, in our current set up some EFD deployments are not visible from the destination cluster due to VPNs and other networking considerations. Thus, we have deployed the Replicator source connector at the source clusters (remote producing). We have tested the this set up to replicate data from the Summit EFD and Tucson test stand EFD to our EFD instance running on Google Cloud. Another good practice is to have a separate Kafka Connect deployment for the Replicator source connector, to isolate this connector from other connectors running in the cluster.

11   References

[1][SQR-029]. Frossie Economou. DM-EFD prototype implementation. 2019. URL: https://sqr-029.lsst.io
[2][SQR-031]. Angelo Fausti. EFD deployment. 2019. URL: https://sqr-031.lsst.io
[3](1, 2) [DMTN-082]. Simon Krughoff. On accessing EFD data in the Science Platform. 2018. URL: https://dmtn-082.lsst.io