KaPoW, Enova’s First Self-Service Data Pipeline

Introduction

At Enova, data science and analytics are central to our business, making the accessibility of data to users a first class priority. Data powers all our decisions, from analytical to operational, in the company’s mission to provide fast, trustworthy credit to individuals and businesses around the world. Mirroring the company’s mission, the Data Engineering team at Enova strives to provide reliable and trustworthy access to data to our data scientists, analysts and business specialists, empowering them to make data driven decisions.

The need for a data pipeline is motivated by the need to move data from our rapidly cloud-bound, data-rich applications to a relational database such as PostgreSQL to be queried at rest, effectively impedance matching the elasticity of the former with the flexibility of the latter. The need for it to be self-service stems from the need for our independent teams to self-sufficiently land the amorphous data from our many varied systems and platforms.

The “Ka”fka-“Po”stgreSQL-“W”riter, or KaPoW for short, is our first self-service data pipeline that provides an easy and well-architected way to move data from any system to any PostgreSQL database, effectively decoupling the SLAs of the producer and consumer. KaPoW is also Enova’s first application to use Apache Kafka. It has been running seamlessly for over 6 months and has landed close to a 100 million packets of relational data from Ruby and Go microservices and lambdas to varied destinations including our data warehouse and operational securitization data stores (wowza!). As a member of the Data Engineering team, I wanted to share the journey we went on with KaPoW and some fun things we learned along the way.

Not Franz Kafka, Apache Kafka

Often mistaken for the writer, Apache Kafka is a distributed streaming platform used to publish,  subscribe, store, and process streams of records similar to a message queue. Kafka is different from a traditional message queue because it stores records. This allows for records to be reprocessed later and for the same record to be processed by multiple subscribers. 

A Kafka cluster can be thought of as a collection of logs across several machines — messages are organized into topics and partitions, where topics can have multiple producers and consumers. Both writing to Kafka and reading from Kafka can happen concurrently. Messages in Kafka are also retained even after they are consumed for a configurable amount of time.  This makes Kafka useful in situations where we may need to re-read data. Producers get an acknowledgement when they publish a message to a topic, and consumers save their offset in the case that a crash occurs and that consumer needs to resume reading from where it left off. Kafka also replicates its logs over multiple servers, so if one or more of those machines fail, messages won’t be lost. 

The scalability, reliability, and durability Kafka provides in both replication and retention makes it perfect for our data pipeline.

The Birth of a (wait for it) Data Pipeline

A data pipeline is, simply put, software that automates the flow of data from one system to another. In most cases (and in the case of KaPoW), the raw data ends up in a data warehouse, a system that pulls data from many different sources, where it can be used by anyone.

Flow of data with KaPoW.

Not Data Replication

Currently at Enova, we use data replication to move data from application databases to Enova’s data warehouse. Data replication refers to the process of creating redundant copies of a database or data store. Not to be confused with data replication, the KaPoW data-pipeline can be thought of as a remote, asynchronous SQL INSERT statement with sub-millisecond latency and elastic scale into any PostgreSQL database.

Observability First

KaPoW, through its self-service nature, brings front and center the traditionally forgotten needs of analytics and reporting. In our experience at Enova, we have found developers and analysts alike adopting a more observability-first mindset due to the availability of an easy-to-use data-pipeline tool, emitting large amounts of state to be analyzed, monitored and observed at rest.

A step in decoupling our data warehouse from our applications

By decoupling our data warehouse from applications, we avoid exposing our OLTP applications to queries that should only be running against a data warehouse.

Kafka acts as a buffer for all the data we need to persist to Postgres. Buffering allows us to throttle ingestion into Postgres, especially when the rate of production of data is especially high. With Kafka and through KaPoW, we’re able to bring elasticity to Postgres; by efficiently multiplexing connections, we can pace out the load on our Postgres databases without slowing down our critical path application processes.

KaPoW can be used to land data into just about any database for any purpose. We have simply chosen to highlight the benefits we have reaped so far with warehousing and reporting. The async nature of KaPoW lends itself well to delayed concerns such as reporting that do not have strictly consistent data model requirements. However, as other consistency models, such as eventual consistency, become more commonplace on the OLTP side in the industry, this data-pipeline can see use for landing transactional data just as easily.

If We Can’t Trust Data, What Can We Trust?!

As William Shakespeare once said, “Don’t trust the person who has broken faith once” or, in this case, don’t trust the data pipeline with incomplete data. Producing anything but high-quality data will cause the loss of trust and confidence in the data pipeline and the data itself. 

Therefore, ensuring data quality is the most important part of a data pipeline, which is why we’ve dedicated significant time and consideration to assure KaPoW always produces complete data. This means that if KaPoW is able to land data correctly 99.95% of the time, we still need to ensure the other 0.05% of the data also lands correctly.

Graceful Restarts

We need to ensure that messages in the middle of processing finish if KaPoW is interrupted for any reason. In order to do that, we implemented custom signal handling and hand-configured the DevOps machinery such as our process monitors and USE (utilization, saturation, error) monitors to complement the application’s signal handling parameters. This way, we ensure that all data lands, irrespective of control plane operations like restarts due to deployments, maintenance or other violations like memory bloat.

An example of one of the graphs we have with KaPoW: the successes and failures of the different parts in processing a packet in KaPoW (filter, sort and persist) per database.

Message Retries

KaPoW is continually running; reading from Kafka and writing to Postgres, if there are any issues with persisting to a database, KaPoW will still continue to function – skipping ahead to continue processing newer, incoming packets. However, we don’t want to just drop the data that failed persistence. 

Errors during processing in KaPoW are broadly classified into two categories: user and system. Errors due to violations by the client (such as schema mismatches or bad data formats) are examples of errors that would be classified as user errors. Errors beyond the control of clients (such as database downtime or network blips) are classified as system errors.

The reasoning behind this choice of classification is due to the difference in the retry-ability of these two categories. Packets that experienced user level errors require some level of manual intervention before they can be retried in order to succeed. On the other hand, packets that experienced system level errors have a good chance of succeeding if simply retried at a later time.

Consequently, KaPoW exposes two topics: a user errors topic and a system errors topic, into which it pushes error’d packets along with some metadata contextualizing the packet and error. The user errors topic effectively acts as a callback with the expectation that clients are subscribed to the same, and will fix associated issues before republishing packets for retry. The system errors topic is essentially a dead letter queue which can be interacted with using a series of endpoints by application administrators to view, reschedule or purge error’d packets.

Distributed Transactions

One of the hard problems to solve was the lack of distributed transactions across Kafka and PostgreSQL. Short of writing a transaction manager or developing an implementation of the 2 phase commit protocol, we had to make a choice between reprocessing or losing a packet due to interruptions. Given that the model of our data-pipeline is write only, reprocessing can easily be turned into an idempotent operation through the use of integrity constraints like uniqueness checks at the database level. Hence, we chose to go with a read from Kafka – persist in PostgreSQL – commit offset into Kafka packet processing model with a NOOP on natural unique constraint violations that are pre-requisites on any table that KaPoW is persisting data into.

Monitoring

We emit RED (rate, error, duration) metrics at a high level of granularity in KaPoW. Apart from serving as a heartbeat/visual pulse for the application, we use these metrics to set alerts on predetermined thresholds on inflow rates by topic and error percentages in both the user and system topics to detect anomalies and stay ahead of disruptions.

Additionally, to get to the very bottom of edge-case issues, we have INFO level logging turned on by default. After being ingested and indexed by our Splunk heads, this serves as the ultimate source of truth for any deep-dive debugging on a per packet level.

How To: KaPoW

KaPoW provides an easy and well-architected way to move data to any database. It is the first piece in self-service data infrastructure at Enova that is used to route any message from any Kafka topic to any table in any Postgres database without needing any intervention from a service team.

Self-Service Infrastructure

KaPoW’s self-service aspect comes from its contract requiring that clients define the Postgres destination alongside the data in each and every packet, similar to how a mailing address works on a package. As long as the client is able to write to the Kafka topic that KaPoW is reading from with the correct message format, and KaPoW is already able to write to the destination in a client’s message, then nothing else needs to be done. If the client wishes to go to a destination database that KaPoW does not deliver to, or is writing to a Kafka topic that KaPoW does not listen on, the only changes required would be to add the same in the corresponding configuration files and restart the application.

In order to process a message successfully, KaPoW implements 3 steps:

1. Filter: checks that the Kafka message has the correct Kafka header. Since Kafka topics at Enova are, as the name suggests, topical, it is entirely possible that there are packets/messages on the topic that have nothing to do with data-pipeline delivery. In order to distinguish those from packets intended for the data-pipeline, KaPoW requires the following Kafka header in order to successfully filter through the message:

{
  "context": {
    "consumer": {
      "type": "data_pipeline"
    }
  }
}

2. Sort: checks that the Kafka message has the correct format and that the database the message has been labeled for is a destination that KaPoW is configured to deliver to. It’s worth nothing that KaPoW also supports the ability to go to multiple destinations by defining them in the message destinations. KaPoW requires the following Kafka message format in order to successfully sort the message:

{
  "pipeline_context": {
    "destinations": [
      {
        "type": "postgres",
        "vanity": "proxydb-masterdb.enova.com",
        "database": "your_destination_db_name",
        "schema": "your_destination_schema",
        "table": "your_destination_table_name"
      }
    ]
  },
  "data": {
    "column_name": "value",
    ....,
    ....
  }
}

3. Persist: attempts to land the data in the defined destination(s). Persist is the step where KaPoW takes a connection from the connection pool and inserts the data to the defined database destination(s).

Load Testing and Scaling

KaPoW is continuously running and writing to Postgres to land data. If we were to directly connect to the database, that would be on average 3,000 connects and disconnects a minute with KaPoW’s current traffic. KaPoW uses a connection pooler to maintain a pool of connections and manage our database connections.

In order to gain even more confidence in our new data pipeline, we load tested KaPoW by producing messages on a Kafka topic with 6 partitions and a replication factor of 3. The following are our results:

With packets of an average size of 250 bytes, the processing rate of KaPoW is about 8,000ppm. After benchmarking, packets of an average size of 10-15 bytes went to a rate of about 80,000ppm.

 

The throughput of KaPoW scales linearly based on the number of partitions in the Kafka topics KaPoW is reading from and the number of database connections KaPoW can use to write. Increasing both these numbers will increase the throughput of KaPoW, allowing KaPoW to scale nearly infinitely. However, even if KaPoW was faced with traffic that exceeded our throughput, there would simply be a small amount of lag in landing the data in the destination databases. Although “real-time” data may not be guaranteed, we have confidence in the resilience and elasticity of KaPoW.

Is it Really a Blog Post Without a Summary?

KaPoW is a lot of Enova’s firsts. First self-service data pipeline. First application to use Apache Kafka. First step in decoupling our data warehouse from our application. The success of our first data pipeline has paved the road for more data pipelines to be developed. With Apache Kafka set up in production, we can find more exciting use cases for this technology. We on Data Engineering are continuously focusing on making data from applications highly accessible to all users, enhancing our data architecture, and leveraging different technologies.