r/apachespark

Netflix/wick: A zero cost type safe Apache Spark API
🔥 Hot ▲ 120 r/apachespark+1 crossposts

Netflix/wick: A zero cost type safe Apache Spark API

We have open sourced Wick, A zero cost type safe Apache Spark API!

github.com
u/JoanG38 — 3 days ago
▲ 14 r/apachespark+1 crossposts

I maintain Apache Spark Connect for Golang so I added streaming and built a Data Lake ORM

Quick context for people who haven't touched it: Apache Spark Connect is the gRPC surface Spark exposes so you can run Spark SQL against a cluster without bundling a JVM in your app. The official Go client is apache/spark-connect-go. I've been contributing upstream for a while. I shipped SPARK-52780, which is streaming reads, so you can pull large result sets without OOMing to your application code, and implement Go streaming systems on the back of it.

I've built successful products using my own fork of spark-connect-go against Databricks and I thought it would be worth sharing the fruit of my labour.

I also think the mindset catching on more is people think of using Spark for 'data contracts'. This works now because Spark Connect is push-based. Commit semantics got better, so the technical reasons for bronze existing are less justifiable, but we're still writing bronze layers because the pattern calcified.

That is, the "dirty data" and "broken pipeline" work I get paid to fix as a contractor is janitorial cleanup of a landing zone that didn't need to exist in the first place if you validate at the application boundary using the type system, you write straight to silver, and the whole bronze tier becomes dead weight.

So one of the spin-out projects that came from this is lake-orm. The vision for this is to stop losing sleep over the same bug. A number of systems I've worked with that touches a data lake writes the same struct-to-Parquet plumbing, the same ingestion code, validation glue, "oops I realised my metadata is dirty and we need to clean it.". Append, merge by key, find out someone wrote something bad to 'bronze' that fails a data-quality check that wasn't really thought about.

In my mind the ORM wants to provide a batteries included approach, which to me just means 'stop re-writing the same code for data pipelines and just declare the models you care about'. For most situations I've seen having worked in the wild of mega-orgs this pattern works. They really just want to be able to quickly define almost document like object storage quickly, and the data quality is more important than the semantics, and where the semantics count what matters more is that it's clear, and that the data is partitioned in a super reasonable way.

Sometimes I've been working in 500-100B row systems and the big blocker to hitting the ground running on day 1 is just grocking through the twenty table join to get a particular concept nobody documented. So I want to shift future clients towards a contract driven approach which aligns better with the other half of my career building lean typed data platforms (often in SQLC). I have fairly strong opinions as an engineer about this and I am happy to answer any questions with my general thought process here.

So anyway for your base simple case, you provide Go structs tagged with spark:"..."  and validators, and they become Iceberg or Delta tables on object storage. Writes go direct to silver via an object-storage fast-path. Reads stream back with constant memory. Joins and aggregates use a CQRS-shaped output struct. The whole thing works with Databricks, and its a not for profit passion project I thought was worth shouting about. I'm not asking you to use the ORM, or to like it, but I am really passionate about the job I do and I wanted to let you guys know it exists now. Contributions are super welcome.

Both repos:

Let me know your thoughts on either project. Happy Coding!

EDIT: PS for the PySpark haters out there, I made DataFrames typed in a way analogous to Dataset.

u/Certain_Leader9946 — 2 days ago

Tpcds benchmark as measure of performance in spark and like engined. - promo

I have been running tpcds benchmark for perf measurement using Apache Spark and my fork. Tpcds runs read only SQL on prepared data. The preparation aspect of data is crucial to the numbers. Since tpcds puts no requirements on preparing data, each vendor / engine fine tunes it to its strength. Nothing wrong there , but the cost of preparing data is overlooked.

For preparing data for tpcds using tpcds toolkit , the tables created are by default partitioned on date column, unless the flag is explicitly set to false.

When I ran tpcds benchmark on a two nodes m1 machines for 3 TB scale factor, the amount of time taken to generate partitioned data was in excess of 6 hrs with sporadic ooms .

The same 3TB data when generated without partitioning on date column, but sorted locally on date column while writing a split locally, took around 40 - 50 mins.

The numbers of partitioned stock spark vs non partitioned spark fork, tpcds run time was 2200 sec and 2300 sec respectively.

Another point is the relevance of TPCDS Benchmark itself in spark and related engines.

Tpcds queries are straightforward sqls, while real world queries using data frame apis can be and for sure are, extremely complex.. so complex that they cannot even be represented using a SQL string. The way one can join data frames or keep adding projects, a SQL string representation if at all created will have abnormally high nested clauses far beyond 6 -7 level of nesting usually allowed by SQL databases..afaik.

Are there any better benchmarks possible which take into account real world usage?.

reddit.com
u/ahshahid — 5 days ago
▲ 9 r/apachespark+1 crossposts

Why does Spark not introduce aggregation computation capability into ESS?

Spark has introduced map-side aggregation on the map side — so why not introduce ESS-side aggregation? Intuitively, this could bring a significant performance boost to job execution.

reddit.com
u/Pitiful-Victory9809 — 6 days ago

I Built a small library for DataFrame schema enforcement - dfguard. Would love to hear your thoughts

For any data engineer/swe who works a lot with dataframes - data schema checks are so boring but often necessary. I was looking at pandera for a small project but got annoyed that it has its own type system. If I'm writing PySpark, I already know pyspark.sql.types. Why should I learn pandera's equivalent (A few libs follow this approach). And libs like great_expectattions felt like overkill.

I wanted something light that enforces schema checks at function call time using the types I already use. And I DID NOT want to explicitly call some schema validation functions repeatedly - the project will end up being peppered with them everywhere. A project level setting should enable schema checks everywhere where the appropriate type-annotation is present.

So I built dfguard (PyPI: https://pypi.org/project/dfguard/). It checks that a DataFrame passed to a function matches the expected schema, using whatever types your library already uses.

PySpark, pandas, Polars are supported. It looks at dataframe schema metadata only (not data) and validates it when a function is called based on type annotations.

Some things I enjoyed while building or learnt:

- If you have a packaged data pipeline, dfg.arm() in your package __init__.py covers every dfguard schema-annotated DataFrame argument. No decorator on each function.

- pandas was annoying - dtype is 'object' for strings, lists, dicts, everything. Ended up recommending `pd.ArrowDtype` for users who needs precise nested types in pandas.

- Docs have examples for Airflow and Kedro if you're using those.

pip install 'dfguard[pandas]' pyarrow 
pip install 'dfguard[polars]' 
pip install 'dfguard[pyspark]'

This quickstart should cover everything for anyone who's interested in trying it out.

Curious to hear any thoughts or if you'd like to see some new feature added. If you try it out, I'm ecstatic.

Edit --

For any curious users about how easy it is (the quickstart page has minimal examples for most things) -

Only 3 things to do -

  1. Import lib

  2. declare the schema of dataframes with the types that are compatible with your df library (2 ways to do it depending on circumstance). There is a function to assign an existing df schema to a dfguard object so that you can use it directly - thought I wouldn't recommend it for data pipelines.

  3. decorating the function if you're using notebooks/scripts, or if you have a packaged data pipeline - include a line in you init.py. That's it!

Good practices - include schema.py files in your packages and import all the schemas for data frames you want schemas enforced.

By default - extra columns are allowed. subset=False in "enforce" functions make it strict.


Shameless plug: if you like the repo - consider starring the repo.

reddit.com
u/laserjoy — 10 days ago

Promo: Two quiet Spark optimizer inefficiencies fixed in our Spark fork (TabbyDB)

If you've ever run complex analytical queries on Spark with wide projections full of nested expressions, you may have been losing performance to two issues . I have seen this issue to again cause query compilation times to run into hours.

Problem 1: CollapseProject and duplicated subexpressions

Spark's CollapseProject rule merges a chain of Project nodes into one. That's generally good — smaller tree, simpler plan. But it can leave you with a single Project where multiple Alias expressions each independently evaluate the same expensive subexpression. Spark does have CSE logic to catch this, but it only works when there are multiple Project nodes to reason about. If your project arrives already in collapsed form, Spark has nothing to split, and the duplicated work silently makes it into physical execution — evaluated redundantly for every single row.

Problem 2: Per-rule change tracking that barely helps

Spark tracks whether each optimizer rule actually modified the plan, so it can theoretically skip unchanged rules on subsequent passes. In practice this gives almost no benefit, because the state info is kept in the tree nodes. If any rule in a batch makes a change in the tree, all the preceding nodes in tree also get recreated loosing the state. the whole batch restarts from rule 1 — including rules that have nothing left to do. For expensive rules that do full plan tree traversals (NullPropagation, ConstantFolding, etc.), that's a lot of wasted optimizer compilation time on large plans.

What we did in TabbyDB

We solve both with a single mechanism:

  1. Let CollapseProject fully flatten everything first — minimal tree, even if subexpressions are duplicated
  2. Then do a controlled expansion: detect replicated deterministic subexpressions and rewrite into a chain of projects, where each subexpression is computed exactly once and referenced by all consumers downstream
  3. Apply the expensive optimizer rules and other batch rules to these expanded blocks once, then mark them immutable

The immutability guarantee is the key. On every subsequent batch iteration, those blocks are skipped entirely by the expensive rules — because we know they're fully normalized and nothing can change. The rest of the plan keeps being optimized normally. It doesn't matter if another rule elsewhere modifies the tree; the immutable blocks are never re-traversed.

Result: less redundant computation at runtime (CSE actually works on already-collapsed projects), and less wasted optimizer time per batch iteration.

Only deterministic expressions qualify — rand(), now(), non-deterministic UDFs etc. are excluded. Query results are identical.

I had opened a ticket on this idea and PushDownPredicates ( a separate perf issues )optimization via the

https://issues.apache.org/jira/browse/SPARK-36786

I had this implemented long back, but did not publish the PR nor ported the code to new spark versions . But now I have got it in KwikQuery's TabbyDB's 4.1.1 release ( will put it as downloadable in a day or 2 )which is based on apache spark's 4.1.1 release.

Apart from that 4.1.1 TabbyDB would further improve the TPCDS numbers from 14% to 17% as tested recently on 3 TB data with 2 nodes, as compared to OSS, for non partitioned tables.

reddit.com
u/ahshahid — 13 days ago

Spark FDE Engineer role @ Snowflake

The Spark team at Snowflake is hiring forward deployed engineers for their Spark offering (located in Menlo Park or Bellevue).

jobs.ashbyhq.com
u/holdenk — 7 days ago
🔥 Hot ▲ 82 r/apachespark+1 crossposts

I love Databricks Auto Loader, but I hate the Spark tax , so I built my own

I love Databricks Auto Loader.

But I don’t like:

  • paying the Spark tax
  • being locked into a cluster
  • spinning up distributed infra just to ingest files

So I built a simpler version that runs locally.

It’s called OpenAutoLoader — a Python library using Polars + delta-rs for incremental ingestion into Delta Lake.

Runs on a single node. No Spark. No cluster.

What it does:

  • Tracks ingestion state with SQLite → only processes new files
  • “Rescue mode” → unexpected columns go into _rescued_data instead of crashing
  • Adds audit columns automatically (_batch_id, _processed_at, _file_path)
  • Handles schema evolution (add / fail / rescue / ignore)

Stack:
Polars (lazy) + delta-rs + pydantic + fsspec

Built it mainly because I wanted a lightweight lakehouse setup for local dev and smaller workloads.

Repo: https://github.com/nitish9413/open_auto_loader
Docs: https://nitish9413.github.io/open_auto_loader/

Would love feedback especially from folks using Polars or trying to avoid Spark.

u/nitish94 — 16 days ago

Built a small tool to inspect Delta Lake pruning and data skipping. Could this be useful?

Hi! I’ve been working quite a bit with Delta tables lately, so I ended up building a small tool to better understand how partition pruning and data skipping actually work.

Not sure if this is useful beyond my own use case or if it’s just something I built to explore things a bit.

Would be curious to hear what others think. This is the link to the repo: https://github.com/cdelmonte-zg/delta-explain

u/LongjumpingOption523 — 12 days ago

Apache Spark 3.5.3 vs 4.1.0 — What actually changed, and should you migrate?

Java 8/11 is disabled
Scala 2.12 is disabled
Python 3.9 is disabled

Should we really upgrade to 4.1.0 or just continue with spark 3.5.x or lower?

For the context, most of the production pipelines might be running on 3.5.x or lower?
Any thoughts?

reddit.com
u/DeeRockzz — 17 days ago
▲ 10 r/apachespark+1 crossposts

Beyond CSV & Parquet: What Real Data Ingestion in Spark Actually Looks Like

Most Spark tutorials focus on clean CSVs and Parquet files, but real-world data is rarely that simple. In this post, I share practical ingestion patterns and lessons learned from working with messy, unpredictable data in production.

medium.com
u/Expensive-Insect-317 — 15 days ago

Promo: KwikQuery's TabbyDB-4.1.1 available for download

The trial version of KwikQuery's TabbyDB-4.1.1 which is in 100% agreement with Spark - 4.1.1 is available on the wesbite for download.

In terms of difference between TabbyDB 4.01 and 4.1.1 ( from perspective of enhancements done in TabbyDB) is

  1. Adding logic of marking a chain of projects as immutable so that optimizer rules are applied only once to project containing huge/repetitive expressions as described in the earlier post.

  2. Enhancement in the runtime performance of pushed down Broadcasted keys of Broadcast Hash Joins, further, by minimizing the iterations over the pushed keys. ( Please note that this feature of pushing down of Broadcasted Keys of Hash Join is present only in TabbyDB). The improvement done in 4.1.1 vis-a-vis 4.0.1 has resulted in 17 % overall TPCDS Benchmark from 14 % , when compared with open source spark of the respective versions.

reddit.com
u/ahshahid — 6 days ago