On the importance of future-proof data architectures, and how Apache Spark helps to build them

Ivan Trusov
6 min readApr 10, 2023

--

In the Hold by David Bomberg, 1914. Image source — Wikiart

In today’s world, we generate and process a vast amount of data every day, and it’s becoming increasingly challenging to manage it all efficiently. Future-proof data architectures that can handle large volumes of data are essential for businesses and organizations.

In this blog post, I would like to highlight how Apache Spark and its solid set of APIs can ease some technical burdens and make the life of Data Engineers slightly better.

N.B. This post is experimental and doesn’t contain the full code version, merely the snippets and concepts.

The metaphysics of data

First, we shall consider the concept of data itself and the directions of usage. If you’ll check any relevant source, you’ll find something like this:

self-made in excalidraw.com

Unfortunately, it’s not provable on a big scale, but I tend to think that the raw data itself is most usually not the main driver of growth in size.

It grows when it gets combined and reshaped.

In my practice I see that the most valuable data is forged out of the following:

  • Several sources are combined (e.g. enriched or cross-checked)
  • Data appears in time for the business to make a decision
  • Data is properly transformed and quality assured

On the implementation side, we have pipelines that have a goal to deliver data under the requirements described above.

And the Data Engineers who are writing these pipelines usually want efficiency in their day-to-day work, for instance:

  • Write expressive and efficient code
  • Write code that is extensible enough to catch future use cases with ease
  • Write code that will be reliable and won’t wake up you in the middle of the night with an alert

Engineering knowledge tends to be forged and framed out of the fact that people are solving the same problems in different domains — and this is how frameworks are born.

In the data space, it’s common to use the OSS data processing frameworks (aka ETL frameworks), and one of the most adopted data frameworks in the world is called Apache Spark.

And I believe amongst other reasons for its incredible adoption, there are 3 of the most noticeable ones:

  • Abstractions when needed
  • Scalability
  • Flexible scheduling

Combination of which adds the future-proof label to the architecture of data applications. Therefore I would like to highlight these capabilities to the audience.

Abstractions when needed

Apache Spark allows you to write code without being bound to the specifics of the technical details to a greater extent.

By this, I mean that it becomes possible to write heavily domain-oriented code with Spark APIs being used when needed, without enforcing users to build everything around the framework.

Let’s consider the following example: I have a process that reads raw IoT data from a source. I want to apply some changes to the input, join it with some other data and save the result somewhere.

It is technically possible to make it like this:

if __name__ == "__main__":
raw_iot_events = spark.read.format("some").load(source_path)
devices = spark.createDataFrame(pd.read_csv(devices_path))

iot_with_devices = raw_iot_events.join(devices, on="device_id")
cleaned_iot = iot_with_devices.where("value > 1")
cleaned_iot.write.somewhereElse()

And this is a valid code. However, it’s not that nice from the perspective of the future, due to several reasons:

  • It doesn’t contain any proper domain model
  • It doesn’t abstract the physical details
  • And there are at least two more issues considered later in this post.

Let’s rewrite it to something that will be actually more domain-driven:

class Pipe:

def source(self, source_definition):
...

def destination(self, destination_definition):
...

def join(self, join_rule):
...

def transform(self, transformer):
...


class LaunchScheduler:
...


class IotProcessingTask:

def launch(): # application layer - no physical details here
pipe = (
Pipe()
.source(RawEvents(conf.events))
.source(RawDevices(conf.devices))
.join(EventsAndDevices)
.transform(EventsCleaner)
.destination(CleanEvents)
)
pipe.launch_with(LaunchScheduler(conf.launch))

if __name__ == "__main__":
task = IotProcessingTask()
task.launch()

I’ve intentionally omitted the implementation details to underline the flexibility of the solution and its independence from the physical implementation (because the framework is the physical detail and not the domain-related one).

Why do this? Because we want our code to be properly abstracted for the future (in this context it means stable to the external changes).

Let’s consider that format of the input is an axis of potential changes. In this case, we’ll need simply to change the properties of the configuration, no code changes are required.

For instance, we can implement a class that is populated from the conf in runtime (it can be even a pydantic BaseModel):

class TableCoordinates(BaseModel):
catalog: Optional[str]
database: Optional[str]
table: str

class RawEvents(BaseModel):
source_type: Literal["file", "table"]
source_path: Optional[str]
source_format: Optional[str]
source_table: Optional[TableCoordinates]

Into these classes, we can easily pass any kind of payload from a static YAML/JSON/another config format, without a need to change a single line of code:

Another potential axis of change is the logic of how we work within the entity. For example, we got a new field that we need to handle in some specific way:

class RawEventsTransformer: # add it as a mixin to the application layer
def transform(df):
return df.withColumn("event_id", F.cast("event_id", "bigint"))

This change is fully isolated from the rest of the codebase, leading us to loosely coupled components that are simple to reuse further.

Scalability

Now we’re going into the next dimension of the data pipelines, namely the scalability topic.

There have been a lot of discussions about using libraries for single-node tasks in the ETL chain. While I appreciate the effort to explore new solutions, I often find myself questioning this approach.

My reason to worry is that it’s a “working now” solution instead of a future-proof solution.

# this goes into OOM as soon as source significatly grows in size
some_source = single_node_package.read_csv(some_source_path)
transformed_in_single_node = some_single_node_transforms(some_source)

...
df_in_spark = spark.createDataFrame(transformed_in_single_node)

As I’ve pointed out in the metaphysics part — data has a tendency to grow.

How soonish could it happen that the some_source the dataset will become out of memory? What would happen if we’ll simply add more rows, or change the logic of the dataset population, e.g. add explicit versioning of the records in this source?

The solution above will blow out with an OOM, leading developers to incidents and a need to refactor this piece. Indeed there are workarounds, e.g. upscaling the single node to the acceptable RAM amount though. But why would we introduce tech debt that requires workarounds?

Instead, there is a solution that will be scalable by default:

some_source = spark.read.format("some_format", some_source_path)
transformed_in_distributed = distrubuted_transforms(some_source)

And even more, with the Pandas API on Spark transformations in Pandas are available inside Spark itself. Moreover, any code that would accept and produce Pandas DataFrames can be used within Spark itself with Pandas UDFs.

Flexible scheduling

One important feature of Spark is the flexible API to launch the ETL code in various processing frequency circumstances.

E.g. you might want to write your code once and make it useful both in batch and streaming, with the capability to easily switch between these behaviors.

Remember LaunchScheduler from the beginning? Let’s take a closer look at the potential design of this part:


LaunchScheduler(BaseModel):
once: Optional[bool]
available_now: Optional[bool]
processing_time: Optional[str]


Pipe:
def launch_with(scheduler_config):
# do the basics first
sources = [spark.readStream(source) for source in self.source_definitions]
sources_transformed = [self._transform_source(source) for source in sources]
joined = self._join_with_rule(self.join_rule)

# process inside batch / micro-batch
def batch_processor(df, epoch_id):
df = self._post_join_transforms(df)
self._write_to_destination(df) # encapsulates writing logic

query = (
spark
.foreachBatch(batch_processor)
.trigger(scheduler_config)
.start()
)
query.awaitTermination()

The brilliant part comes in the following — it becomes possible to write the ETL code once.

When a batch is required, it’s enough to useonce or available_now (read here on the differences), and when it comes to a business requirement for faster SLAs, it’s easy to switch to other triggers without changing the code itself, and by just changing the configuration.

Summary

Apache Spark as a framework allows users to build apps that can be easily and efficiently scaled. At the same time code of the data application can be domain-driven and flexibly scheduled without a need to develop two versions of code for batch and streaming use cases.

These capabilities combined together allow developers to deliver their data applications in a future-proof way without the requirement of costly refactoring in case of changed requirements and data growth.

Have you tried using writing data applications with Apache Spark in a domain-driven design? Have an opinion? Feel free to share it in the comments! Also, hit subscribe if you liked the post — it keeps the author motivated to write more.

--

--

Ivan Trusov

Senior Specialist Solutions Architect @ Databricks. All opinions are my own.