Scaling Data Storage and Data Processing and Machine Learning in Production Systems

Christian Kästner
45 min readFeb 24, 2022

--

This chapter covers content from the “Managing and Processing Large Datasets” lecture of our Machine Learning in Production course. For other chapters see the table of content.

Many systems exceed the resources provided by a single machine and may need to be scaled with changing demands. Most insights into how to design scalable and distributed systems are not specific to machine learning — software architects, performance engineers, and distributed-systems experts have gathered plenty of experience designing scalable software systems, for example, clusters of servers that respond to millions of requests per seconds and databases that store petabytes of data across many machines. Technologies for distributed data storage and computation are well established and available as commodities, be it as open-source projects or cloud services, and they are commonly used as building blocks of ML-enabled systems.

For many ML-enabled systems, scalability will be an important design challenge to be considered, be it (1) for collecting, storing, and transforming large amounts of training data, (2) for collecting and storing large amounts of telemetry data, (3) for processing large numbers of model inference requests, or (4) for running large distributed jobs to train models. Hence, software engineers and data scientists involved in developing ML-enabled systems benefit from an understanding of how to design scalable systems.

The key principles of how to design scalable systems are fairly well understood. When building software systems, developers will almost always rely on existing abstractions and infrastructure, rather than implementing their own. For example, they may store petabytes of raw data in Azure Data Lake, run hour-long batch jobs using Apache Sparks with hundreds of servers hosted by the Databricks cloud infrastructure, or install Apache Kafka on multiple in-house servers for scalable stream processing. However, understanding the key concepts and their tradeoffs will help selecting the right technologies and design the system architecture in a way that makes it easier to scale later.

In this chapter, we provide an overview of the key ideas, while leaving details to other books. This follows our goal of educating T-shaped team members (see Introduction) that know key concepts and concerns of their colleagues, in addition to their own specialty, so that they can ask the right question, communicate effectively, and know when to bring in additional help early in the process.

Running Example: Google-Scale Photo Hosting and Search

Let us again consider image search, but this time at the extreme scale of Google’s Photo Service, where users can upload and organize their photos on a webpage and mobile app. Google does not release detailed statistics, but Google reported in 2020 that they store more than 4 trillion photos from over a billion users and receive 28 billion new photos per week (that’s about 46 thousand photos uploaded per second).

Google Photos shows uploaded photos online and in a mobile app, typically chronologically. It provides various ways to edit photos, for example, by applying filters, often based on machine learning. Google Photos provides many more machine-learning powered functions, such as running object detection to associate images with keywords for search, detecting images that could be cleaned up, suggesting ways of grouping pictures, identifying friends in pictures, and predicting which pictures likely represent happy memories.

Example of the Google Photos web interface searching for “trees” in uploaded personal images.

Back of the envelope math reveals the necessary scale of the operation. Conservatively assuming 3 megabyte of storage per photo and 2020 upload numbers, we need to process 135 gigabytes of images per second. A typical hard disk can write 100 megabytes per second, that is, just storing the images would require writing to at least 1400 disks in parallel. For object detection with a deep neural network, we need to resize each image and convert it into a feature vector and then perform a bunch of matrix computations during model inference and write the result back to some data storage. Fast object detection models (say YOLOv3) have about 20 to 50 milliseconds inference latency per image in recent benchmarks, so we’d need at least 2780 parallel processes to keep up running predictions for new incoming images. If we ever decided to run an updated model on all photos, a single sequential process would need 2536 years just for model inference on existing photos.

Some characteristics of the service will be important for design discussions later. The amount of pictures uploaded will likely vary by time of day and season and differs significantly between users. Short outages and delays are likely not business or safety critical but reflect poorly on the product. When uploading photos, users reasonably expect to see uploaded photos in their library immediately, sorted by date. The user would probably also expect that the shown total number of photos is updated immediately after the upload. In contrast, users will likely not notice if search does not work immediately for new images. For suggesting filters and tagging friend, different user interface decisions require different response speeds in the background: If users are to accept filter suggestions or confirm friends within an upload dialog, suggestions need to be ready immediately; alternatively filter suggestions can be shown in the app later, friends can be tagged in the background without user involvement, or the app could issue a notification when it suggests filters later. Reminding users of likely happy memories is something that is scheduled for later times anyway.

Scaling by Distributing Work

When facing situations where capacity of a specific machine is no longer sufficient to serve the computing needs for a specific task, such as the photo hosting service in our running example, we typically have three options: (1) use more efficient algorithms, (2) use faster machines, or (3) use more machines. Initially, the first two options can be promising. Finding bottlenecks and optimizing implementations can speed up computations substantially, especially if the initial implementation was very inefficient. Buying faster machines (faster or more CPUs, more memory, more storage space) can increase capacity without having to change the implementation at all. However, performance optimizations will only go so far, and better hardware quickly becomes very expensive and runs into physical limits long, long before we reach the Google scale of our running example. Hence, true scalability is almost always achieved by distributing storage and computations across multiple machines.

In the context of scalable ML-enabled systems, pretty much every part of the system may be distributed. In some cases, adopting a distributed design is driven by necessary distribution of the system deployment, such as mobile apps, edge computing, or cyber-physical systems. In many others, it is driven by scalability needs when faced with massive amounts of training data (distributed storage, distributed data processing), with computationally expensive model training jobs possibly involving specialized hardware (distributed model training), with applying model inference to large amounts of data or using model inference while serving many users concurrently (distributed model inference), and with collecting and processing large amounts of telemetry data (distributed storage, distributed data processing). We will often see distributed computations involving network calls among ML and non-ML components. In our running example, we can see all these reasons in a single system: mobile apps, large amounts of training data, expensive model training jobs for learning the object detection and other models, lots of incoming photos in which we want to detect objects, and lots of users producing telemetry data about how they interact with the system, all spread across many ML and non-ML components.

While distributed systems are beneficial and often unavoidable, they come with significant challenges: Distributed computing is inherently complex. It introduces new failure modes, such as dropped or delayed network connections and difficulty finding consensus across multiple machines. For example, users of our photo service would be surprised to find that some photos are missing when they sometimes open the app, just because requests were served by a different server that happened to be down when the photos were uploaded. Developers of distributed systems need to invest heavily into anticipating failures and designing strategies to handle them, from simply retry mechanisms to redundancy and voting, to transactions, to sophisticated distributed consensus protocols that can provide guarantees even in the face of unlikely network failures. Testing and debugging distributed systems is particularly difficult. We will not be able to guarantee seemingly simple properties such as that every item is processed exactly once, since any part of a distributed system may go down at any time, before it starts processing an item or before it sends us a confirmation that it has processed an item. On the positive side, distributed computing benefits from being able to buy many copies of standard hardware that is cheaper than specialized high-performance hardware. It is easier to scale (often nearly linearly), by simply adding more hardware.

Fortunately, by understanding a few important concepts and tradeoffs it is possible to design and implement distributed systems based on robust abstractions and building blocks. The task of an engineer is then primarily that of making appropriate tradeoff and design decisions and choosing what infrastructure to build upon, without having to implement low-level retry mechanisms or consensus protocols.

Data Storage at Scale

Data storage infrastructure has a long history and is generally well understood. Before we cover mechanics for distribution, let us revisit key abstractions.

Data Storage Basics

The three most common forms of data storage approaches are relational data, and document data, and unstructured data.

In relational data models, data is structured into tables where all rows follow a consistent format defined in a schema. The typical language to interact with relational databases is SQL, which specifies queries declaratively what data should be received or modified, allowing the database infrastructure to figure out how to answer the query most effectively. To avoid redundancies in data storage when multiple elements share information such as multiple photos being taken by the same user or multiple users following the same public photo albums, data in one-to-many or many-to-many relationships is typically normalized. That is, such relations are split into multiple tables referencing each other by keys. When data is queried, such information is joined from multiple tables as needed. Database engines typically also maintain indexes to query data efficiently.

select p.photo_id, p.path, u.photos_total
from photos p, users u
where u.user_id=p.user_id and u.account_name = “christian”

Example of three tables in a relational data model and an SQL query connecting some of them via keys. The data is normalized in that user and camera data is stored only once but associated with multiple photos. Note that the query only describes what should be returned, but not how the data is retrieved efficiently.

In a document data model, data is stored in key-value pairs in collections, where values can be simple values (e.g., numbers, strings) or complex values such as nested object structures (JSON is commonly used). In most systems, structure is not enforced for values within one collection, providing flexibility. Data in a collection can be efficiently retrieved by the key and often indexes additionally allow efficient lookup of values based on some indexed parts of documents. Data in one-to-many or many-to-many relationships is typically not normalized in the document data model, but often stored redundantly — if not, a reference to a key in a different collection is stored as a field in the document. If joins across multiple collections are needed, the developer typically needs to call the database multiple times to retrieve the objects from multiple collections, rather than having the database infrastructure perform joins. Databases implementing such document models are typically called NoSQL databases.

{
"_id": 133422131,
"path": "/st/u211/1U6uFl47Fy.jpg",
"upload_date": "2021-12-03T09:18:32.124Z",
"user": {
"account_name": "christian",
"account_id": "a/54351"
},
"size": "5.7",
"camera": {
"manufacturer": "Google",
"print_name": "Google Pixel 5",
"settings": "f/1.8; 1/120; 4.44mm; ISO271"
}
}
db.getCollection('photos').find({ "user.account_name": "christian"})

Example data storage in the document model using the JSON format. Rather than as tables, data is stored as objects, with nested inner objects. Notice how user and camera data is stored redundantly with each photo. Data is accessed here by a search over an inner field in the document, within a single collection.

Unstructured data is simply stored in a file in a disk without any enforced structure and any efficient access mechanism. For example, log files simply store lines of text without any identifying keys or enforced structure. To find certain values in unstructured data, one typically has to search through all data and look for entries that match specific patterns — there is usually no index that would allow us to find data easily.

02:49:12 127.0.0.1 GET /img13.jpg 200
02:49:35 127.0.0.1 GET /img27.jpg 200
03:52:36 127.0.0.1 GET /main.css 200
04:17:03 127.0.0.1 GET /img13.jpg 200
05:04:54 127.0.0.1 GET /img34.jpg 200
05:38:07 127.0.0.1 GET /img27.jpg 200
05:44:24 127.0.0.1 GET /img13.jpg 200
06:08:19 127.0.0.1 GET /img13.jpg 200

Example of a log file of a web server indicating which files were accessed when and from which address.

There are many well known tradeoffs between these storage models. For example:

  • Both relation and document storage requires a certain amount of planning and preparation to organize and structure data, whereas storing data in unstructured format often just appends data to files with minimal infrastructure. Both relational and document databases can organize the internal data storage to facilitate efficient retrieval; with suitable indexes it is possible to look up specific data very quickly without reading the entire database.
  • Normalization in relational databases helps to avoid redundancies, which reduces storage space and avoids inconsistencies, but comes with additional complexity of joins at query time. In our example above, user information and camera information are stored redundantly, which is space-inefficient and more challenging to update consistently in many places if data ever changes. Relational data models are well suited for expressing many-to-many relationships efficiently, but document databases excel at retrieving documents with nester inner structures without the need for complex queries. If joins are needed in document databases, for example, to express many-to-many relationships, they are often poorly supported, leading to more complex application code where developers implement multiple queries manually without the benefit of internal optimizations in the database engine.
  • Enforced schemas of relational models help with some data quality issues (see also chapter Data Quality) but can seem limiting when data varies frequently in structure or the schema evolves frequently. Schema validation is offered as an optional feature in many document databases, but often not used in practice; it is more common that the client code checks that the retrieved document has the expected structure when using the data retrieved from a query. Unstructured data has no schema to enforce in the first place.
  • Nested object structures in document data models is often a more natural match for complex objects used in programs, which would need to be reassembled from complex queries in relational data storage. In contrast relational models fit well with tabular data often used in data science.

Data Encoding

To store and exchange data, data is typically encoded. This helps to reduce size for storage and network. The most common types of data encodings are:

  • Plain text (csv, log files): Data is stored in plain text format, readable to humans. This is easy to read and write without additional libraries, but schema enforcement would usually need to be implemented manually.
  • Semi-structured, schema-free (JSON, XML, YAML): Data is encoded as documents of possibly nested key-value pairs. Keys are usually strings and values may be text, numbers, lists, or other documents with key-value pairs. JSON as in the document database example above is the currently most common format of these, is human readable, and can be easily read and written by most programming languages. Typically there is no schema enforcement that would require that a document contains certain keys, ensuring or checking the correct shape of the document is left to the client code that reads the documents.
  • Schema-based encodings (relational databases, Apache Avro, Apache Thrift, Protocol Buffers, and others): Data is stored in a space-efficient binary format for a given schema. The aim is to read and write data quickly and minimize storage size, not human readability. Since all values follow a fixed schema, only the values but not the keys need to be stored, making storage more compact. Schema compliance is enforced by the database or library used for encoding or decoding the data. Libraries are typically available for all popular programming languages. Schema updates need to be planned with care but are supported in one form or another by all implementations.

In machine-learning contexts, often a lot of information is extracted from log files stored in plain text. Data scientists often store their data tables in plain text CSV files during early exploration stages. Communication among components in a system often exchanges semistructured data, especially JSON is a common input and output format for many APIs — this includes model inference services. Once operating at scale, switching to schema-based encoding can substantially reduce network traffic and storage size in production systems. Schema-based encoding comes at the cost of needing to learn and use sophisticated libraries, specify schemas, and manage their evolution. This can be a steep learning curve initially but usually pays off in the long run, as schema evolution needs to be managed in one form or another anyway.

Distributed Storage: Partitioning and Replication

When data storage and data processing needs to scale beyond the capacity of a single machine, two important concepts are used as the building blocks for distributed data storage solutions: Partitioning is the idea of splitting data, such that different parts of the data is stored on different machines. Replication is the idea of copying data to multiple machines. With partitioning, we distribute data across multiple machines and, with replication, we increase capacity to access that data (throughput) and guard against data loss with redundant storage. Readers familiar with RAID technology for hard drive virtualization will find these concepts as natural extensions to distributed systems.

Partitioning. Partitioning is the process of dividing the data such that different subsets of the data are stored on different machines. For partitioning of tabular data, we usually distinguish between horizontal and vertical partitioning. Horizontal partitioning splits the data such that different rows are stored in different tables — for example, we might store photos in our running example such that all photos of a user are stored on the same machine, but photos from different users may be stored on different machines. In contrast, and less common in practice, vertical partitioning splits the data such that different columns (or fields in a document) are stored on different machines — for example, we might store the main photo information on one machine but extended metadata such as camera settings on a different machine, since the camera details is only rarely retrieved. The vertically partitioned fragments on different machines are connected with a unique key for each row. Horizontal partitioning is particularly effective if many queries affect only a few rows; vertical partitioning is useful if different columns are needed in different queries.

Independent of how data is partitioned, some part of the data storage infrastructure or system at large needs to hold connections to all partitions, know how to look up or compute where the data is stored, and send queries or new data to all relevant partitions, composing the responses. Depending on the partitioning strategy, a client might be able to compute directly which of multiple machines to ask for specific data (e.g., by deriving the machine id from a hash of the key) or a central lookup table stores which machine holds what data. In some use cases, it is clear that only a subset of the partitions are relevant because only certain rows or columns located in certain partitions are needed, such as when looking up a specific photo, but in the worst case a query needs to be sent to all partitions. Typically, a client-side library for the data storage infrastructure will handle all of this transparently, hiding all that complexity from clients to whom the database appears as a single unit.

The data is partitioned horizontally with a domain-specific partition criteria based on the customer address in the data. Clients may need to interact with all partitions for queries.

Replication. With replication, a distributed storage infrastructure will store replicas of data on multiple machines. Since all machines have the same data, any machine can respond to a read query. This way, we can easily increase throughput for read requests by replicating data across more machines.

The complexity of replication lies in how to perform write operations: When data is added or changed, this change needs to be reflected in all replicas. Unfortunately, we cannot guarantee that all replicas perform updates at exactly the same time and in the same order — if two clients independently try to change the same value at the same time or nearly at the same time, the different replicas may receive the changes in different orders resulting in different results. Worse, updates may fail on some but not all replicas for any kind of reason (e.g., out of space, network outage, hardware failure). For all these reasons, there is a risk of inconsistent data across replicas, where different replicas hold different views of the data and return different answers to queries. In our running example, a user might observe that a deleted photo occasionally reappears in the app, because the app reads from different replicas but delete operation failed on some replicas.

There are many different approaches of how to ensure consistency across replicas, all with different limitations and tradeoffs, and all implemented in different readily available infrastructure solutions. The most common are:

  • Leader and follower replication is a common design where one replica is named the leader and all others act as followers. All changes are performed on the leader, who has the official consistent view of the data. All followers receive changes from the leader. It is possible to force synchronization of leaders and followers, but more commonly changes are propagated asynchronously and followers may be slightly out of date (stale) at any point, but will always be at an internally consistent state copied from the leader. Infrastructure of this kind typically has extra functionality to handle an outage of the leader by electing a new leader. This design is built-in in many popular databases, including MySQL and MongoDB. It allows fast reads, but the leader is a bottleneck for write operations.
Leader (primary database) and follower (backup database) design. Clients can read from any database but need to write to the leader.
  • Multi-leader replication substantially increases internal complexity but overcomes the write bottleneck of systems with a single leader. Write operations can be applied to any of multiple leaders and those leaders coordinate changes among each other, typically establishing a protocol to resolve write conflicts, for example, by finding a consensus to decide in which order changes are applied. This results in a consistent definitive version eventually, though not necessarily exactly the one a client would have expected from its local viewpoint. These kinds of protocols to eventually resolve write conflicts are also commonly used in offline apps that only occasionally synchronize with remote databases and in collaborative editing (e.g., Google Docs). The complexity lies in defining a conflict resolution strategy, sometimes in problem-specific ways.
  • Leaderless replication gives up on distinctions between leaders and followers entirely. To ensure that data is not lost when a replica goes down, clients send write operations to multiple replicas from where the infrastructure will take care of pushing them to all remaining ones. To ensure consistency and detect stale data however, data is versioned and read operations will ask multiple partitions, detecting problems at query time and repairing them on the fly as needed. This style of replication is common in document databases, such as Amazon Dynamo, Cassandra, and Voldemort. This design sacrifices some read throughput for higher write throughput and a less complex implementation.

From a system developer’s perspective, the choice of the right replication strategy (and infrastructure that implements this strategy) depends on the relative importance of write and read throughput and the importance of immediate consistency and allowable lag until all clients will see the latest update.

In machine-learning contexts, inconsistencies or slightly outdated versions of data are often acceptable as minor additional noise in the data, to which most machine-learning algorithms are fairly robust. For instance, in our running example, missing a few recent photos in the training data or missing one very recent additional label will likely not affect model accuracy much and might be fixed in the next model update anyway. In general, from a system design perspective, it is often much easier to achieve scalability by designing the system to tolerate some inconsistencies or tolerate slightly stale data from the database, rather than force the database to assure perfect consistency.

Partitioning + replication. Naturally, partitioning and replication are often combined. Partitioning is often needed simply to handle volumes of data that exceed the capacity of a single machine. Replication is an effective way to scale read and sometimes also write throughput and it increases fault tolerance by building in redundancy. By partitioning the data and then replicating all partitions, we can achieve all those benefits (but also inherit all consistency problems from replication) at the cost of needing more hardware.

Even if additional throughput is not needed, adding replication to a partitioned system is important for fault tolerance, as otherwise every single partition could fail and bring down the entire system. To avoid a single point of failure, each partition should be typically replicated on at least two machines.

Google File System and Hadoop’s Distributed File System (HDFS) are two popular solutions that are based on these principles. Both store potentially very big data blobs (files) identified by a name. Files are broken into chunks, so that a fragment of a file can be identified by name and offset. Chunks are distributed across multiple machines (partitioning) and additionally replicated with a central server keeping a mapping of where each chunk is stored. A client wanting to read part of a file requests the chunk locations from the server and then can get the file content from any of the servers hosting the chunks. To a client all of this internal complexity is hidden in a library.

Architecture image of Google’s file system (GFS) from Ghemawat, Sanjay, Howard Gobioff, and Shun-Tak Leung. “The Google file system.” ACM SIGOPS Operating Systems Review. Vol. 37. №5. ACM, 2003. The GFS master stores the location of chunks but the actual file content is partitioned and replicated across multiple chunkservers.

Transactions. Transactions are a common database concept to perform multiple read and write operations as if they were executed in a single step, all or nothing. This avoids inconsistencies from multiple competing concurrent write operations or write operations that are based on stale data that has been updated between the read and the write. Traditional database infrastructure offers transaction support and strong guarantees. Transaction support can also be provided in distributed database infrastructure, though typically with substantial implementation complexity and runtime overhead.

While transaction support seems like a desirable feature, system designers should ask themselves whether it is absolutely necessary due to the substantial performance overhead. In machine-learning contexts, where systems must be robust to noise and wrong predictions, often minor inconsistencies might be acceptable, allowing system designers to opt for simpler and faster technologies that do not offer transaction guarantees. For example if a filter is applied to a photo in our running example, but the object detection feature has already read the old photo to infer keywords in parallel, a potential resulting minor inconsistency can likely be tolerated.

Data Warehouses and Data Lakes

Data warehouses and data lakes are two popular and very different design strategies to organize data in a system, both of which are common in ML-enabled systems. Each style is supported by plenty of dedicated tooling and commercial cloud offerings.

Data warehouse. A data warehouse — and its smaller, but conceptually similar cousin the data mart — is typically a dedicated database that stores data in an aggregate and uniform format optimized for data analysis. A data warehouse is typically separated from operational systems that process and store real-time transactions and update only in batches, say once per day. It is optimized for read access and read-only except for regular bulk updates.

In our running example, a data warehouse may aggregate upload and view counts of photos, how often suggested filters are applied, or what kind of objects are commonly detected, potentially organized by region, the user’s age, the user’s gender, and other account characteristics, and tracked over time. This allows data analysts to create reports on how the system is doing on larger time scales and to drill down if there are certain user groups who may be underserved. These kinds of analyses are performed at scale and computed in regular intervals. Traditionally, data warehouses are used to understand business cases and opportunities by creating reports, not to provide live monitoring.

In the process of entering data into a data warehouse, data is typically extracted from multiple sources, typically other databases, but possibly also log files or event streams. Data is then often transformed and merged into a form that is amenable to the subsequent analysis; for example sales data may be grouped by day or store. The result is loaded into the database representing the warehouse and indexed, facilitating subsequent queries. Many open-source and commercial tools facilitate all parts of this extract, transform, and load (ETL) process and they are the key domain of data engineers in an organization.

Note that all these tasks are very similar to collecting and preparing data in a machine-learning pipeline (see chapter Automating the ML Pipeline) and data scientists can benefit from using robust ETL tools or integrating data engineers in their team, especially when collecting and integrating data from numerous different data sources within an organization. ETL tools often specialize in (incrementally) extracting data from different sources, various transformations for data cleaning, data integration, and normalization, and handling large amounts of data efficiently; they provide robust functionality for automation, parallelization, monitoring, profiling, and error handling.

Of course it is also possible to prepare a dedicated data-warehouse-style database as the main data source for data scientists building ML pipelines, shifting the effort of collecting, cleaning, and transforming the data to a data-engineering team in the organization. In many projects, data scientists will also collect data from existing data warehouses within an organization as training data or as part of their feature engineering, for example, when building a model to predict for the photo service how often users would like to see notifications reminding them of past photos based on aggregate past observations.

Data lakes. A data lake is a repository of data stored in a raw and unprocessed format. The key idea is that data may be valuable in the future in ways that we cannot anticipate today and that any form of data processing might lose details that we might benefit from later. For example, if we only store the most recent version of photos, we may not be able to later identify which filters were applied most commonly or what kind of images are commonly deleted. In some sense, a data lake is a bet that the future benefit of the data, even if we cannot anticipate any benefit today, will outweigh the cost of storing the data. With relatively low storage costs these days, even small future benefits may justify storing data. This bet is motivated by many stories of companies that later discovered the value of their past log data in machine learning projects, for example, when Google discovered that they could use machine learning to build customer profiles from past search queries to create targeted advertisements. The effort for extracting useful information from raw data, by locating relevant parts and transforming them, will be deferred until a use for the data is discovered.

In its simplest form, a data lake is simply an archived directory where raw data is stored permanently in an append-only format, typically primarily log files and sensor readings. Many organizations have reported though that they find it useful to invest at least some effort into storing metadata to have some chance of later identifying what data they have, how it was produced, and how to extract information out of it. Dedicated infrastructure to track data and metadata in data lakes and to facilitate discovery is available, for example, DataHub can be used for this.

When information is later extracted from raw data in data lakes, many of the same transformation steps found in ML pipelines and ETL tools are performed, typically in massive batch processing jobs (see below) over a huge amount of data, sometimes even extracting data live as new data is added in a stream processing fashion (see below). Some cloud offerings for data lakes provide dedicated infrastructure for processing the stored data at scale, which can be connected to machine-learning pipelines or data warehouses.

Of course, not every data lake turns out to be successful and it is easy to amass a huge amount of useless, undocumented, and impossible to navigate data in a short period of time. Data lakes are hence often derogatively called data swamps or data graveyards.

Distributed Data Processing

Just as many systems will need to find scalable solutions for storing data, so will they need scalable solutions for computations, including computations to serve lots of user requests and computations that process large amounts of data.

We will discuss four different strategies: (1) Services and microservices, where client requests are answered immediately as they come in, optimized for quick response time. (2) Batch processing to perform computations over very large amounts of data, which typically take minutes to days and are optimized for throughput. (3) Stream processing, which processes input events from a queue in near real-time, also optimized for throughput. (4) Lambda architecture which combines all three for certain forms of processing.

(Micro-)Services

As explained in chapter Thinking like a Software Architect service-oriented architectures (including microservice architectures) are a common form of system structures that break a system into cohesive modules (each with their own data storage) that each have a specific functionality and each can be developed and deployed largely independently. Each service responds to requests, one request at a time. Systems using services make heavy use of remote procedure calls, sending arguments to the service over the network and receiving the answer back, often in JSON format or compressed in a schema-based encoding (see above). In modern distributed systems, services are addressed by network addresses (URIs) and offer REST APIs. Scalability is achieved by running multiple instances of a service, distributed by a load balancer, often all managed by cloud infrastructure. Each service can be scaled independently.

Services tend to be optimized for fast response times as other components calling the service are waiting for an immediate response. When a service becomes overloaded, it tends to drop requests or respond with long delays. Clients have to handle error cases that can arise with remote procedure calls, commonly with the help of remote-procedure-call libraries that have features such as retrying requests if they do not receive a timely answer.

The modular nature of common machine-learning components make it a natural fit for service-oriented architectures. Especially model inference components are natural services that receive inference data in requests and return predictions (see chapter Deploying a Model). In many cases, the bare model inference is deployed as a service (with or without feature encoding) used then by other services that provide some more application-specific functionality with business logic.

A machine-learning pipeline may also be deployed as one cohesive service that can be called by other parts of the system to trigger a new execution that trains a new model. The pipeline service itself may request data from other services, such as a service providing training data.

Load Balancing and Request Routing. Service-oriented architectures foster scaling of systems by launching multiple instances of individual services. If a service manages internal state, such as user profiles, this state must be shared by all instances, typically in a database accessed by all service instances. The fact that inference functions are usually stateless (i.e., do not need to store data between requests) makes it easy to run many instances of them without coordination overhead — this is sometimes called Stateless Serving Function Pattern or Model as a Service Pattern.

With multiple instances, requests can be routed transparently to the different instances of the service by a load balancer. Typically routing of requests can happen at the network level, entirely transparent to the clients calling a service. Routing is often connected with management logic that launches or stops additional instances as needed with changing demands. Infrastructure may also dynamically reallocate services to hardware to optimize response times and cost in the presence of limited resources, for example, to move model training tasks to hardware with GPU support and then re-allocate those machines for model inference when done. Overall, a lot of complexity involved in scaling a service is pushed to the network and cloud infrastructure and handled by the operations team.

Mechanisms to dynamically route requests to different instances of a service are not only used for balancing load, but also to route requests to different versions of a service to experiment in production, for example using A/B testing or canary releases (see chapter Quality Assurance in Production).

API Gateway. An API Gateway is a common design solution to organize access to (micro-)services and route requests. Commonly components in a system will call multiple services and need a way to access them. In addition, different services may encode data differently for transport. Rather than storing the address and implementing the specific transport characteristics of each service in each component that uses the service, an API gateway provides a single point of access to all services in the system using a unified remote procedure call interface.

The API gateway acts as a unified access point for all services in a system. If a service is split into multiple services, if a service is changed to use a different protocol for communication, or if a service is moved to a new address, this is hidden entirely from clients. An API gateway can act as the sole place for managing authentication and authorization; it is a good place to implement rate limiting, to provide retry and recovery mechanisms if services are unavailable, and to collect telemetry. The API gateway can be combined with a load balancer (see below) and caching mechanisms. API gateways also often offer request aggregation to bundle multiple requests to different services in a single call between client and API gateway to reduce the number of network round trip messages. Finally, API gateways often act as a directory of available services to help users discover available functionality in a system. As a downside, the API gateway adds another indirection, possible bottleneck, and potential single point of failure.

In terms of traditional object-oriented design patterns, the API gateway acts as a Facade to coordinate access to an entire subsystem, it acts as an Adapter to hide different protocols spoken in the backend by different services from clients, and it acts as a Proxy to implement features like authentication, caching, and error handling.

An API gateway provides a uniform entry point to access multiple services. It typically handles many tasks, such as request routing, authentication, rate limiting, monitoring and logging, some error recovery, protocol translation, and caching.

Batch Processing

Batch processing refers to a pattern of performing computations (including transformations) over very large amounts of data. In contrast to services, batch jobs are not triggered by individual requests and do not provide an immediate response. Instead, they tend to perform long-running computations and write results into some data storage solution from where it can be accessed once the job is complete.

Batch processing in machine learning. In a machine-learning context, batch jobs are common for preparing large amounts of training data as part of executing a machine-learning pipeline when training a model (data collection, data cleaning, feature engineering). Batch processing can also be used to perform model inference for every single item in a large dataset, for example, to use an object detection model to identify keywords for all photos stored in our photo service.

This style of batch processing is a very common strategy to extract information from the vast amounts of unstructured data in a data lake, for example to extract view counts or deleted photos from vast amounts of raw log data. ETL steps in data warehouses may also be performed as distributed batch processes.

Structure of batch-processing jobs. To efficiently perform batch computations at scale it is important to break them down into steps that can each be performed on a subset of the data before combining the results. This way, parts of the computation can be performed in parallel on different machines with different subsets of the data. Common steps include extracting information from raw data, filtering data, and aggregating data.

Consider the example of identifying the most viewed photos per user from very large amounts of log data, produced by many instances of a web server hosting the photo service backend. Assuming that the log is very big and already partitioned, we can split some of the work to independently process each partition in isolation: Filter all the lines of the log file to include only those relating to viewing photos, extracting the file name and user, and counting the number of views for each. Once this work has been completed on each partition of the log file, the intermediate results can be combined, sorted, and grouped to identify the final results. If there are lots of intermediate results, these steps can be parallelized similarly to the original parallel analysis. The final results are eventually written into a new file.

The most common paradigm for batch processing is MapReduce, which provides a common programming model and associated infrastructure to execute batch jobs at scale, for example, implemented in the open-source Apache Hadoop. As the naming inspired by functional programming suggests, map steps perform side-effect-free computations on one row of the data at a time (e.g., filtering, data extraction, data transformation) to produce key-value pairs as intermediate results. The map function can be applied to different rows of the data in parallel (on one or multiple machines) and the intermediate results are simply aggregated, grouped by shared keys. The intermediate results are usually much smaller than the original data. Next, reduce steps merge the different intermediate values computed for a key from different data subsets (e.g., when keys are file names and values download counts, a reduce step would output the sum per key), again producing an intermediate result of key-value pairs. The reduce step can also be executed in parallel on different subsets of the intermediate results that share the same key. Multiple map and reduce steps can be sequenced in different ways. All steps must be free of side effects, producing outputs only based on the inputs, in a way that is repeatable if needed.

MapReduce example for counting views per photo: Map computations are applied near the stored data, one row at a time identifying the photos; data is then grouped by keys (photo name) in intermediate operations and reduce operations are applied, one key at a time, to count the number of views.

Once computations are expressed in the MapReduce style, the batch computation infrastructure can schedule and orchestrate the computations in the intended order. It will typically allocate computational resources near where data partitions are stored, perform the map step in parallel, transfer and group the intermediate results such that reduce steps can access all results with the same key (called shuffle in MapReduce terminology), and then perform the reduce step once all needed intermediate results are ready, again potentially in parallel partitioned by key. This process is repeated through the entire sequence of map and reduce steps of the batch job. In addition to managing intermediate results and executing each step in the intended sequence once previous steps have finished, the infrastructure will detect failures in individual steps and restart computations that did not complete. The infrastructure typically also manages multiple concurrent batch jobs, orchestrating the different steps of multiple jobs across many machines.

Moving computations to data. A key insight in MapReduce-style computations is that moving computations is cheaper than moving data. Given the massive size of some datasets, it would be too expensive for a process to read all data over the network. This typically even holds if computations require large machine-learned models in a model inference service. It is therefore common to perform computations on (or near) the machines where the data is stored and transfer only the much smaller intermediate results. Replication of data provides additional flexibility, since the computation can be executed on any machine that has a replica of the relevant data.

Beyond MapReduce. Modern dataflow engines like Apache Spark, Tez, or Flink follow a similar approach to older MapReduce infrastructure, but provide more flexibility in the programming model, giving more responsibility to the infrastructure to decide how computations are split and where they are performed. They are fairly common for programming large batch jobs in machine-learning projects these days and broadly supported at scale by commercial cloud offerings, such as Databricks.

Stream Processing

Stream processing offers a very different approach to handle data processing at scale than batch processing. Where batch processing performs computations on a large snapshot of the data, potentially for a long time, stream processing continuously processes data, as data arrives. Data to be processed is typically entered into a queue and processed largely in a first-in-first-out order. Entering data into the queue and processing data from the queue can both be parallelized to scale throughput.

Stream processing overview. Stream processing designs have their roots in event-based architectures, also known as message-passing-style architectures, publish-subscribe architectures, or producer-consumer architecture. Terminology differs between different implementations, but concepts are similar: A message broker keeps a list of topics. Components in a system can act (a) as producers who send messages to the broker for a topic or (b) as consumers who subscribe to a topic to receive new messages that arrive. The broker forwards all messages received on a topic to all subscribers on this topic, buffering messages in a queue if production and consumption speed differ. This design decouples the producers from the consumers, since the producer does not need to know which components consume the message or even how many. Note that in contrast to services, the producer does not expect or wait for a response; once the message is sent their responsibility is done; at most they may subscribe to potential answer messages on a different topic.

Multiple processes producing and consuming messages on three topics. Most processes have multiple instances that process messages in parallel. Several processes consume messages one one topic and produce messages on a different topic.

This style of system architecture through message passing is common in different contexts, typically when components continuously respond to new events or new data. In programming user interfaces, components often subscribe to events such as button or mouse clicks and react when such an event occurs. In business systems, messages may refer to business events, such as sales, which may trigger multiple other independent actions in the system, such as, recording the sale, sending an email, updating the user profile, and logging. In the context of machine learning, messages can refer to all kinds of data passed around in the system, including training data and user data for model inference.

Stream processing typically has much lower throughput than batch processing, but much better latency. Instead of waiting hours or days for a result on a large amount of data, we get near-real-time results as moderate amounts of additional data arrive.

Stream processing and machine learning. ML-enabled systems can consider stream processing as an essential strategy to scale multiple different parts of the system.

Systems that continuously collect training data, for example from log files, from user actions, or by continuously scraping APIs, can use message brokers to funnel data through the machine-learning pipeline. At the source, each new data point can be fed as a message into the broker. Then data cleaning code can process each data point, sending the resulting data to a new topic. Subsequently, feature engineering code can process the cleaned data, one row at a time, extracting features, writing the resulting feature vectors back into a new topic. From there a new consumer may receive the feature vectors and write them into a database for later training in a batch process. Importantly, all steps may execute with different throughputs, where the stream-processing architecture allows to scale each step independently. For example, if feature engineering is expensive, we can simply add more machines to help with this step.

Stream processing is a particularly good match for systems that are learning continuously with new data using incremental machine-learning algorithms. Here the model training component can subscribe to a topic providing new training data (e.g., feature vectors) and continuously update the model. For example, the model detecting friends in photos might be updated continuously each time the user tags an image or confirms a suggestion by the model.

Stream processing can also be used for model inference, when model predictions should be reflected in data soon after it has been added or changed, but the prediction is not needed immediately. In our running example, we want to add keywords to each image soon after it has been uploaded, but we do not need the keywords immediately for a response. In this case, we can add new photos as messages to a topic queue and let model-inference workers process these messages one at a time, with many workers in parallel. With varying load in the message queue, we can dynamically allocate more or fewer workers as needed.

Finally, stream processing might be used to collect and analyze telemetry data. The system might feed log data about model inference and user interactions into the message broker, from where a monitoring infrastructure as a consumer analyzes the events and produces sliding-window reports about model quality. For example, we might track how often a user accepts the suggested friends or analyze how often a user tries multiple search queries in short succession without clicking on any photos used as a proxy for being unhappy with the original search results (see more on analyzing telemetry in chapter Quality Assurance in Production).

Stream processing infrastructure. The stream processing infrastructure typically takes on substantial responsibility and is a key ingredient to scale the system. It is possible to implement own message brokers or build them on top of databases, but specialized infrastructure is typically heavily optimized, highly customizable, and relatively easy to adopt. For example, typical implementations provide the following functionality:

  • The message broker can be distributed itself to achieve higher throughput, using various partitioning and replication strategies. Some implementations connect producers and consumers entirely without a central broker.
  • To speed up processing, most infrastructure supports multiple instances of consumers to process messages from a topic in parallel, where each instance receives a subset of the messages. Here, the broker acts as a load balancer. Similarly, multiple instances of producers can write to the same topic.
  • Monitoring the number of buffered messages or the delay with which messages are processed can be used for infrastructure to automatically adjust the number of instances.
  • The broker tracks which messages have already been received by instances of each consumer to forward only new messages, even if multiple different consumers process messages at different speeds.
  • The broker may persist buffered messages to recover from failures. It may remove messages once they have been received by all consumers or after a maximum time. Alternatively it may store old messages for new consumers to receive messages sent before they initially subscribed (adding an element of persistent data storage familiar from databases).
  • Different implementations offer different error handling strategies. Producers may or may not request acknowledgement that their message has actually been received. Consumers may confirm that they have received a message or that they have finished processing a message, allowing different retry designs in cases of failures.

Developers that use a stream processing infrastructure need to make important decisions about how to handle errors. Where batch processing requires that computation steps are side-effect free such that parts of the computation can be repeated if necessary, developers using stream processing applications have more flexibility but also more responsibilities. Depending on how the broker is configured or used, developers of consumers can guarantee either (a) that each message is processed at least once, while it may be processed multiple times when the system recovers from errors, or (b) that each message is processed at most once, while messages may be lost as part of errors — guaranteeing exactly-once processing is not possible. In our running photo example, lost messages likely do not matter much for updating the photo count (at most once) whereas running object detection multiple times is not a problem for tagging images with keywords (at least once). If consumers perform important transactions, such as charging credit cards for ordered photo prints, each message should probably be processed at least once, but external mechanisms (e.g., based on unique transaction identifiers) need to ensure that processing a message multiple times does not lead to multiple credit card charges.

These days, Apache Kafka is a popular broker implementation choice for stream processing. It is designed for massive throughput and scales well by distributing the message broker itself. However, many competing message broker implementations exist and most commercial cloud service providers offer their own too.

Documenting data flows. The distributed communication over challenges provides a lot of flexibility in composing systems from loosely coupled components, but the resulting system architectures are known to be challenging to understand and debug. In traditional service-oriented systems, a client sends a request and waits for a response, but with stream processing the producer sends a message into the void and a consumer hopes that somebody produces the right messages in the right format. A single message might be processed by many consumers, who again produce new messages, resulting in long pipelines and graph structures.

Example of a part of the possible structure of processes interacting on topics in the photo service example. Uploads and server logs are streamed into the system where multiple processes analyze them and produce other event streams, again consumed by other processes, including model inference, model training, and model monitoring components.

Documenting data flows in a system to identify which components produce and consume what kind of messages across different topics helps to keep an overview. Most message brokers provide some monitoring support to observe which topics are in use, how many messages are produced in each topic, and how far behind consumers are each; it is often also a good idea to observe trends for all of these. In addition, it is often prudent to document the data format of messages in the various topics and explicitly manage its evolution. Some message brokers explicitly support enforcing schemas and schema-based encoding (see above) can be added for those that do not.

Lambda Architecture

The lambda architecture is a recent and popular combination of the three strategies: services, batch processing, and stream processing. In a classic big-data-analysis setting, it uses batch processing to perform accurate computations over all data in regular intervals (e.g., totaling all business transactions by region), stream processing to incrementally update results (e.g., add recent transactions to the totals), and services to answer queries with the most recent results (e.g., respond with total transactions for a specific region). In the context of machine-learning, it might use batch processing to regularly train models on all data, stream processing to incrementally update models with new data between batch jobs, and services to respond to model-inference requests with the latest model.

Immutable append-only data (event sourcing). The lambda architecture relies on modeling data as a stream of edit events, stored in an append-only log of events, often called event sourcing. That is, rather than updating individual rows in a table, we record an event that an individual row has been changed. Storing data as a list of changes has a long tradition in some version control systems (see chapter Versioning, Provenance, and Reproducibility) and as a recovery mechanism in database implementations.

This style of data storage allows the system to reconstruct data at any point in time: The most recent version of data can be recovered by replaying all change events from the very beginning. Older versions can be recovered by replaying the event history only partially. Keeping the history of data can be beneficial when trying to analyze past actions, such as how often users changed the description of a photo or what kind of photos they edited or deleted. The drawbacks of this approach are that replaying the history to retrieve the latest data may be very expensive and that additional storage space is needed, especially if changes are frequent. Typically, a view of the most recent data is kept in memory or on disk for fast access and snapshots at certain times may be stored as well to enable faster processing.

addPhoto(id=133422131, user=54351, 
path="/st/u211/1U6uFl47Fy.jpg",
date="2021-12-03T09:18:32.124Z")
updatePhotoData(id=133422131, user=54351,
title="Sunset")
replacePhoto(id=133422131, user=54351,
path="/st/x594/vipxBMFlLF.jpg",
operation="/filter/palma")
deletePhoto(id=133422131, user=54351)

Example of a log of events describing changes to data.

Some parts of a system may already produce data naturally in an append-only form, such as log files and sensor readings. Data lakes are also a good match for the lambda architecture, as data stored in data lakes is almost always append-only. Values derived from such data, such as how often photos have been viewed or feature vectors for a machine-learning job, would often be cached in a database to avoid having to repeatedly process the entire dataset.

Three Layers of the Lambda Architecture. The lambda architecture consists of three layers: the batch layer, the speed layer, and the serving layer.

The batch layer performs computations over the entire data as a batch process. For example, it may compute the view count of every image from log data or train the friend detection model on all available training photos. This is designed for large computations on a snapshot of the entire dataset (at a specific point in time) and benefits from batch computing infrastructure discussed above. Batch processing is a good fit for processing large amounts of data in a data lake. The batch layer typically runs the large batch job in regular intervals, say daily or weekly.

The speed layer uses stream processing to update the computation with every additional data row that was added to the append-only data storage since the snapshot on which the last batch job was computed. For example, it may update the view count of each photo or incrementally update a friend-detection model with additional data as users tag photos or confirm suggestions. Incremental computations are often approximations or more challenging to parallelize and system designers often accept minor inaccuracies, since the result will be replaced regularly with the more accurate computation from the batch job. For example, multiple consumer processes to update view counts may overwrite each other’s changes — while this could be avoided with distributed transactions, the much higher throughput from avoiding transaction overhead may be considered more important, and minor inaccuracies in view counts are not considered problematic since accurate view counts will be recomputed in the next batch job anyway. Similarly, incremental updates to a machine-learned models, especially when computed in a distributed fashion, may be of lower quality than training the entire model from scratch (especially if training data is changed repeatedly, since it is hard to unlearn a specific now-changed training data point) — but still it likely provides better predictions than using stale models from the last batch job.

Finally, the serving layer provides the results from the batch and speed layers to clients, knowing how to access the internal results of the computations. In our examples, it may respond with view counts for specific photos or execute model inference with the latest revision of the friend detection model.

Lambda architecture with three layers for serving requests, processing events, and batch processing on all data.

Notice how the lambda architecture combines the strength of the different designs, as each layer focuses on different qualities: The batch layer performs large scale computations and produces the most accurate results, but the results may be already outdated by the time the job completes. The speed layer focuses on near-real-time updates at lower throughput, while potentially sacrificing some precision with approximations. The serving layer focuses on answering requests with low response time based on results or models precomputed with batch and stream processing. If it fits the system requirements, this kind of architecture can allow to balance latency, throughput, and fault tolerance at scale.

Distributed Machine-Learning Algorithms

Many machine-learning algorithms are computationally intensive, but especially deep-learning approaches are often used at substantial scale, ingesting large amounts of data and performing computations for weeks or years on specialized hardware. As a consequence, many modern implementations of machine-learning algorithms take advantage of multi-core machines and can distribute training jobs across multiple machines.

For illustration, OpenAI’s GPT-3 released in 2020 is a deep learning model with 96 layers and 175 billion weights. During training and inference, the model itself takes up 700 gigabytes of memory. It was trained with 570 gigabytes of compressed text data. It is estimated that training would have taken 355 years if done sequentially, performed on a cluster with 285,000 CPU cores and 10,000 GPUs built specifically for large machine-learning jobs. Based on contemporary pricing of cloud resources, it was estimated that training this model once would have cost nearly 5 million US dollars.

While we will not go into technical details of distributed machine learning, we will provide some intuition, based on the common distribution strategy of a parameter server introduced in TensorFlow.

In essence, as briefly explained in chapter Quality Attributes, a deep neural network consists of a large number of parameters organized in matrices, one matrix per layer (parameters are the values to be learned, eventually they are constants in the learned model). The backpropagation algorithm used for learning takes one training input-output pair, computes a prediction for the input (through a sequence of matrix multiplications with the model parameters), and then adjusts all parameters a tiny bit to push the model more to the expected outcome for that input (in a process that requires more math). This process is repeated over and over again with different input-output pairs until the parameters stabilize. This approach to incrementally adjust parameters is called gradient descent.

To distribute gradient descent, the model parameters are copied to multiple worker machines. Each worker machine will take a subset of the input-output pairs and perform gradient descent locally on the model parameters, adjusting them in each learning step. When done with a batch of data, the parameter adjustments from the various workers are merged, so that all workers start with the same parameters for the next batch. Instead of having all workers coordinate with all other workers, workers talk to a server that stores and merges model parameters, the parameter server. An additional key innovation to save bandwidth for large models is not to transmit updates for all model parameters in each synchronization step (this would have been 700 gigabyte for GPT-3 after each batch), but only transmit parameter differences for the (usually few) parameters with changes above a certain threshold. In addition, to balance the load, the parameter server itself can be distributed with multiple servers each storing only a subset of the model parameters.

At a high level, this approach is similar to classic batch processing where work is distributed to different workers, who each process a subset of the data and afterward integrate their results (the “reduce” in map-reduce). In contrast to traditional batch processing jobs, merging parameter updates and sending only partial updates leads to approximations and noise that makes learning nondeterministic. As in batch processing infrastructure, implementations usually provide substantial support for coordinating and scheduling the work, managing network traffic, managing multiple independent jobs, and error handling.

The gradient descent learning strategy of neural networks is also naturally incremental. Hence learning can be performed in a stream processing setting as well, where new data is fed into the learning process continuously and the latest model revision can be received from the parameter server at any time.

Architecture diagram of the TensorFlow Parameter Server architecture from Li, Mu, et al. “Scaling distributed machine learning with the parameter server.” OSDI, 2014. On the top, the server group holds the model parameters and merges parameter updates from the workers. Workers perform gradient descent learning on subsets of the data and send updates to parameters back to the server, batched and compressed to save bandwidth.

Performance Planning and Analysis

When operating large and distributed systems, planning and monitoring are key to handling large loads. Ideally, workloads are planned ahead, analyzing what work can easily be scaled by running multiple instances of a service, and where bottlenecks may occur (often in databases). Software architects and performance engineers may then be able to perform simulations or analytical computations (e.g., using queuing theory) to estimate how different designs of a system will scale before it is built.

More commonly, teams investing in some design but without deep expertise in scalable systems pick technologies and design the system somewhat with future scalability in mind, but actual performance problems are tackled when the need arises. As we will discuss in chapter Technical Debt, this could be justified as prudent technical debt to get to market quickly without too much upfront investment.

At the component level, various forms of performance testing exist to test how components behave under load and what throughput can be expected. Typically tools like Apache JMeter send a fixed or increasing number of requests to a component to measure (a) response time, (b) throughput, and © resource use at different loads. For example, load testing checks whether a component can handle the maximal expected load, scalability testing explores how the component behaves as load increases, soak testing explores whether the component remains stable when overloaded for some time, and stress testing intentionally overwhelms the entire system to test graceful degradation and recovery. Such tests can also be automated and performed on every release, ensuring that a change to a component does not unexpectedly slow down the entire system.

At the system level, observability through performance monitoring becomes essential. To this end, developers instrument the various components of the system to track server and network load, calls among components and their response time, message queues in message brokers, and many other performance indicators. Various off-the-shelf tools exist to perform the instrumentation, collect the results, and produce reports and dashboards, such as AppDynamics. Setting up monitors for average response time, throughput, and downtime of different parts of the system is not difficult, but requires some initial setup cost.

Example screenshot of an AppDynamics FlowMap showing the status and interaction of components in a distributed system. Source: AppDynamics Knowledgebase.

A solid performance monitoring infrastructure provides a strong foundation to observe long-term trends, to automate the addition and removal of additional servers as needed, to identify problems before they become serious, to alert on issues that require manual intervention, and to automate releases and conduct A/B testing (see chapter Quality Assurance in Production).

Summary

Machine learning is often used in data-intensive systems at scale. It is common that such systems exceed the capabilities of a single machine with huge amounts of data and high demands for computational resources, be it for data processing and feature engineering, model training, or model inference. In such settings, distributing storage and computations becomes inevitable.

Distributed systems come with substantial engineering challenges. Fortunately, developers do not need to start from scratch but can build on top of powerful abstractions and corresponding infrastructure that handle many of the complicated parts to build systems that are truly scalable and observable. Still a basic understanding of the key concepts and tradeoffs is important to select appropriate techniques and design systems that can truly scale.

Just like software engineers in production projects with machine-learning components will benefit from understanding key concepts of machine learning, they and data scientists will benefit from understanding key abstractions provided for databases, distributed data storage, and distributed computation, including services, batch computation, and stream processing. Also understanding concepts behind current buzzwords such as data lakes and lambda architecture helps judging whether they are suitable for a given project. Whether it is eventually software engineers, data scientists, or dedicated data engineers and operators that will select and operate specific technologies, establishing a shared understanding and designing systems anticipating tradeoffs and limitations will help the entire team to work together toward a scalable system.

Many of these abstractions and infrastructure concepts are also described as architectural styles, architectural patterns, or design patterns, including microservice architecture, publish-subscribe architecture (stream processing), the lambda architecture, data lakes, and the batch serving pattern.

Further Reading

--

--

Christian Kästner

associate professor @ Carnegie Mellon; software engineering, configurations, open source, SE4AI, juggling