Unlocking the Power of Data: How we scaled our analytics with in-house Event Logging

Smit Patel
Instawork Engineering
7 min readMar 7, 2023

--

At Instawork, data plays a significant role in our growth and is critical for informed decision-making. We use Amplitude to track and analyze data that fits the needs of many users within our org. However, relying solely on such services was starting to become expensive and limiting our ability to control and customize our data collection and analysis.

To address this challenge, we decided to build our event logging platform which can record events or actions that occur within any part of our systems or applications. This decision enabled granular control over what data is being collected and how it is processed.

The problem

For startups, it may be tempting to rely only on product analytics services for event logging. For stakeholders like product managers, product analysts, and data scientists, these services can be great for gaining insights into user behavior. However, there are several reasons why we considered implementing our event-logging service:

  1. Granular data control: An event logging system allows us to have more granular control and customizability over what data is being collected and how it is being processed.
  2. Flexibility and scalability: An event logging system provides the flexibility and scalability needed to support complex data collection and analysis.
  3. Data ownership: With a product analytics service, the data is typically owned by the service provider. It often comes with limitations such as the limited number of events, event properties, and data volume. With an event logging system, we can have full ownership and control over our data. We can also easily integrate with other systems and tools within your platform. e.g. machine learning pipeline
  4. Real-time processing: With product analytics services, you can set up an integration to export collected event data to your data warehouse. However, these integrations are not real-time. Having your event logging service allows you to process event data in real-time, enabling you to make it more actionable.

Design

Our event logging service has 3 primary functions.

  1. Collecting data
  2. Archiving data
  3. Making data available in the Data Warehouse

It typically consists of,

  1. set of APIs that developers can use to log events (Collecting data)
  2. message broker for collecting and processing event data (Collecting data)
  3. storage system for storing (Archiving data)
  4. data pipeline to ingest collected data to a common data warehouse (Making data available in the Data Warehouse)

The message broker here plays a critical role in data collection and archival (Components 1, 2, 3). One popular message broker for event logging is Apache Kafka. Kafka is a distributed streaming platform designed to handle high volumes of data in real time. The following attributes make Kafka a popular choice for building data pipelines, stream processing, and event-driven applications such as event logging services:

  1. Scalability: Kafka is designed to handle high volumes of data, making it an excellent choice for event logging. It can handle millions of events per second, making it easy to scale up as your system grows.
  2. Fault-tolerance: Kafka is highly available and fault-tolerant. It is designed to replicate data across multiple nodes, ensuring that events are not lost in the event of a failure.
  3. Real-time processing: Kafka allows you to process events in real-time, enabling you to detect and respond to issues as they happen.
  4. Flexibility: Kafka supports a wide range of use cases, and has a mature ecosystem of tools like Schema Registry, Kafka Connect, and kSQL that are essential in building an event logging service.

So overall, our high-level system architecture looks something like this. We will go over some of these components.

The collector service

This service is the first part of our event data pipeline. It is a high-throughput, low-latency service to collect events in real-time from your clients. Clients within our systems and applications will use the API provided by this service to send a batch of events to be logged.

There are three primary functions of this service.

  1. Client Authentication: authenticate the client sending the events
  2. Event Schema Validation: validate the event batch data against a defined schema to ensure that it is properly formatted and includes all required fields
  3. Produce to Kafka: use a Kafka producer library to publish events to a Kafka topic

GoLang can be a great choice for building a high-performance, low-latency event logging service.

Let’s start with a simple web server built using the Gin framework.

type Event struct {
EventType string `json:"event_type"`
EventName string `json:"event_name"`
SessionID string `json:"session_id"`
Timestamp int64 `json:"timestamp"`
Props map[string]string `json:"props"`
}

type EventBatch struct {
Events []Event `json:"events"`
Meta map[string]string `json:"meta"`
ClientProps map[string]string `json:"client_props"`
}

func main() {
// gin web server
router := gin.New()
router.POST("/events", handleEvents)

_ := router.Run("localhost:8080")
}

func handleEvents(c *gin.Context) {
var eventBatch EventBatch

if c.BindJSON(&eventBatch) != nil {
// handle error
}
// send to event channel.
c.JSON(http.StatusOK, gin.H{"message": "events tracked successfully"})
}

To log events to Kafka, we will need to create a publisher. This publisher has an internal channel to buffer event batches sent from the web server.

type Publisher struct {
Producer *kafka.Writer
EventBatchChannel chan *EventBatch
}

var publisher Publisher

func main() {
// gin web server..
// ...

publisher = Publisher{
Producer: &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "events",
Async: true,
RequiresAcks: kafka.RequireOne,
},
EventBatchChannel: make(chan *EventBatch, 1000), // channel with 1000 buffer size
}
}

func handleEvents(c *gin.Context) {
// ...
// send to event channel
publisher.EventBatchChannel <- eventBatch
}

Now we have event batches buffered in the publisher channel. We will need some way to send them to the Kafka topic. Let’s create a goroutine that runs concurrently with the rest of the web server.

func main() {
// gin web server...
// ...

// start worker(s) to publish events to kafka
go publishEvents(1)
}

func publishEvents(id int) {
// This is a worker goroutine which will read events from EventBatchChannel and write it to kafka
fmt.Printf("Worker[%d] started.", id)
for {
select {
case data, ok := <-publisher.EventBatchChannel:
if !ok {
fmt.Printf("Got kill signal for EventBatchChannel. Stopping worker[%d]", id)
return
}
writeEventBatchToKafka(data)
}
}
}

func writeEventBatchToKafka(batch *EventBatch) {
messages := make([]kafka.Message, 0, len(batch.Events))
for i := range events {
messages = append(messages, makeMessage(&batch.Events[i]))
}
_ := publisher.Producer.WriteMessages(context.Background(), messages...)
}

The event archive

The second part of the event data pipeline consists of Kafka and S3. This is where we store the collected event data as an archive. To move events from Kafka to S3, we use Amazon S3 Sink Connector by Confluent. It can export data by guaranteeing exactly-once delivery. This is implemented using AWS MSK connect feature of Amazon MSK. It uses Kafka Connect open-source framework.

This connector is an important part of the pipeline as it controls how archive files are created. We want to configure the following to suit our needs.

  1. Maximum events to store in a single file
  2. Time to wait before flushing events to an archive file
  3. Path structure

An example configuration,

{
"connector.class": "io.confluent.connect.s3.S3SinkConnector"
"topics": "TOPIC_NAME_HERE"
"s3.region": "AWS_REGION"
"s3.bucket.name": "S3_BUCKET_NAME"
"tasks.max": "2" # should be less or equal to the number of topic partitions
"flush.size": 10000
"storage.class": "io.confluent.connect.s3.storage.S3Storage"
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
"path.format": "'dt'=YYYY-MM-dd/'hr'=HH"
"partition.duration.ms": 3600000
"timezone": "UTC"
"timestamp.extractor": "RecordField"
"timestamp.field": "timestamp"
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter.schemas.enable": "false"
"schema.compatibility": "NONE"
}

To the data warehouse...

The last part of our data pipeline is to make this event data available in our data warehouse — AWS Redshift. We use the serverless framework to deploy and manage this pipeline step.

Essentially, this serverless application has AWS lambda that triggers every time an S3 archive file is created by MSK connector. It connects to the Redshift cluster and uses COPY command to ingest event data to the target table.

A lot is going on in the above figure as you can see. It has other lambda functions, SQS queues, DynamoDB, etc. These components make the data ingestion to Redshift more robust and error-proof.

Closing Notes

Since release, the event collector has processed 300M+ events, and helped us unlock the power of data beyond product analytics use cases. One specific area where this created a significant impact was in capturing impression data in our app. This is very high-volume data and is important in our ML systems for building new models which help improve the user experience on Instawork.

We can now tailor our data collection and analysis to our specific needs, rather than relying on a one-size-fits-all solution like Amplitude.

Here are a few examples of how we have started using it in various areas of our platform:

  1. Backend Platform
    The backend system uses the event logging service to track user behavior. For example, logging events when partners change any input on the booking form or actions taken in dispatch service.
  2. Mobile Applications
    Mobile applications use the event logging service to track user behavior and app usage. For example screen views, button clicks, and user interactions. This data can then be used to optimize app performance, improve the user experience, and identify areas for improvement.

Do you have any use cases where such a logging service could be helpful? Are you overcoming product analytics services’ limitations using a different approach or solution? Let us know in the comments!

--

--