Journey to 15 Million Records Per Second: Managing Persistent Connections
By Ayushri Arora, Ruchi Saluja, Raghu Nandan D
“Data is the new oil. Like oil, data is valuable, but if unrefined it cannot really be used. It has to be changed into gas, plastic, chemicals, etc. to create a valuable entity that drives profitable activity. so, must data be broken down, analysed for it to have value.”
— Clive Humby
In today’s highly competitive environment, enterprises must make decisions at a rapid pace, backed by data. More often than not, the most valuable insights are hidden deep inside a huge volume of complex and unorganized business data.
Domain Name System (DNS) log is a source of data that can provide valuable insights for decision making by operators and other businesses. The large volume of this data is a treasure trove for network operators detecting and remediating security threats. It can help businesses with creative pricing and packaging by leveraging customer connectivity needs on a near real-time basis. While data mining of DNS queries can be of immense value to business, the process of realizing value from data collection, enrichment, ingestion, and subsequent analysis poses significant challenges to technical team members.
In this article, we will discuss our journey to a peak throughput of 15 million records per second processing DNS queries. The earlier article Classifying High Volume DNS Traffic In Near Real-Time discusses the problem, the high-level technical architecture, and the technical components selected for providing the final solution. Here we discuss on the various challenges and solutions in managing the persistent connections from client to server deployed on container orchestration or cluster auto-scaler platforms like Kubernetes or VM Scale Sets.
The entire process can be broadly divided into four stages:
- Data collection
- Data enrichment
- Data ingestion
- Analytics and visualization
Here we discuss some of the approaches we have taken and challenges we faced in the first two phases of the process illustrated above.
Data Collection: Using wire tracing or log files to collect the queries is an expensive process, so we used dnstap for collecting query data. Rather than collecting network packets, dnstap is generated within DNS implementations. It uses protocol buffers to encode events that occur inside DNS software. The data collection operates asynchronously, meaning that regular DNS operations within resolvers continue independently of measurements being taken, thus minimizing the impact on performance.
Data Enrichment: DNS queries are classified based on pre-set rules in the policy server. The data is fed into a cloud database management system like Snowflake for further analysis. The data enrichment happens at near real-time. Therefore, it needs a container orchestration platform like Kubernetes that can automatically scale based on traffic volume.
The client-server code was written in Rust. Rationale for this choice can be found in the earlier article mentioned above. The process is summarized below:
1. Establish persistent connection using a control frame handshake (typically Connect-ACK-ACK).
2. Divide frame data into chunks and remove the control frame (typically at the start and the end of the transaction).
3. Maintain current pointer for keeping track.
4. Write decoded data frame to server.
5. Use connection handling:
- Switch connections at predefined interval.
- Retry with exponential back-off (write retry and connection retry).
6. Limit rate.
1. Acquire polling hashmap data from policy server APIs and feed into hashmaps.
2. Keep listening for incoming connection.
3. Spawn new asynchronous task for each new connection.
4. Handshake with agent prior to receiving traffic.
5. Process received data stream.
6. Use connection limiting.
7. Decode received frame and perform lookup in hashmaps.
8. Create CSV records after decoding and mapping.
9. Spawn asynchronous tasks for writing CSV records in chunks to files mounted on blobfuse.
Managing Persistent Connections
The server-side code is containerized and deployed on K8s pods. Client establishes a TCP connection to the server using load balancer service IP, and then kube-proxy creates iptable rules. Based on iptable rules, kube-proxy re-routes the connection to an available pod. Kubernetes can automatically scale and load balance between the pods. However, with persistent connections like TCP, this is not possible.
The challenges faced in managing these persistent connections include:
- Once the client establishes a TCP connection with a pod, the load is not distributed to other pods even when the traffic increases, unless the client closes the connections and re-establishes it.
- Even when client closes connections and re-establishes, there is no guarantee that the connection is established with a newly scaled pod. It might still get connected to a pod that is busier.
- Until and unless the client closes its connection, the server resources are not freed up even after all data is received.
- If the server closes the connection, the client is not aware and continues sending the request. Ideally, the server should send a request timeout status code. But then, after having sent the request, not all clients re-send the request.
Further, scaling can be done in various ways in Kubernetes: Horizontal Pod Autoscaler (HPA) runs the metric server by default to get metrics and use algorithms for scaling.
(desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )])
For customized scaling, we tried the following options:
Kubernetes Metric Adapters: This adapter enables scaling of application deployment pods running on Kubernetes, using the HPA with external metrics from Azure resources (such as service bus queues) and custom metrics stored in application insights. We took request per second metrics from Azure application insights and used them with HPA.
Kubernetes Event-driven Autoscaling (KEDA): KEDA provides multiple options for the selection of event source on which the pods can be scaled. Each application container can be individually scaled based on different events. We used a custom metric API for scaling. As soon as the load increased, the pods scaled. However, the scaled pods remained idle. Since all the requests are sent via a single TCP connection, the Kubernetes iptable is invoked only once: when the connection is established and it selects the pod. All subsequent requests will land on the same pod. Multiple options were tried to ensure that the client traffic is distributed across all servers.
Ingress Controller: Azure Application Gateway Ingress Controller (AGIC) uses an application gateway which is a layer 7 load balancer. It works for only web traffic and does not work for layer 4 traffic like TCP and UDP.
Usage of Linkerd: This traffic split functionality allows for the dynamic shifting of a portion of traffic destined to a Kubernetes service to another service. However, the client traffic in this case cannot be split even if a new pod is added.
Network Policy: We tried using network policy to allow or deny traffic between the pods. This did not work because there was no egress traffic from pods.
Usage of Metrics Adapter: We created an environment variable that constantly polls the number of pods in the cluster. As soon as a new pod is added according to thresholds in the HPA configuration settings, the client connection is closed and re-established with the intent that the new connection will get established with the new pod. This approach was not pursued because it can result in inconsistent system behavior.
Custom Connection Switching: Initially, each client establishes two connections (tcp1 and tcp2). An asynchronous task switches the connection at every predefined threshold interval (switch_time) and programs changes to its respective connection identifier flag (tcp_flag).
For spawning the asynchronous task, we used Rust library Tokio. We moved values across the spawned task via Multi-Producer, Single-Consumer (MPSC) channels. According to the Rust Tokio documentation, an MPSC channel is “A multi-producer, single-consumer queue for sending values across asynchronous tasks.” Receiver and sender handles are available. Receiver allows a task to read values out of the channel. If there is no message to read, the current task will be notified when a new value is sent. Sender allows sending messages into the channel.
If the channel is at capacity, the send is rejected, and the task will be notified when additional capacity is available. In our current implementation, connections are sent from a spawned task via channel and received into the main thread. After every write, the original connection is refreshed with a new write received from another thread. For example, if TCP1 is writing, TCP2 is refreshed and vice-versa. With this approach, the refreshed connection considers all available servers for routing the traffic to optimum pod available.
Our solution is a novel approach for managing persistent connections. It is a scalable cloud-native stateless architecture. This solution is applicable for all container orchestration platforms like Kubernetes and other auto scaling solutions like virtual machine scale sets in Azure. Our solution is successfully running in enterprise client environments where it was implemented for peak traffic of 15 million packets per second.
In our subsequent article, we will share various other challenges we faced and solutions we implemented in the data enricher solution using Rust.