WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics Performance Troubleshooting Using

WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics Performance Troubleshooting Using

WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics Performance Troubleshooting Using Apache Spark Metrics Luca Canali, CERN #UnifiedDataAnalytics #SparkAISummit About Luca Data Engineer at CERN Hadoop and Spark service, database services 19+ years of experience with data engineering Sharing and community Blog, notes, tools, contributions to Apache Spark @LucaCanaliDB http://cern.ch/canali #UnifiedDataAnalytics #SparkAISummit 3

CERN: founded in 1954: 12 European States Science for Peace and Development Today: 23 Member States ~~ 2600 2600 staff staff ~~ 1800 1800 other other paid paid personnel personnel ~~ 14000 14000 scientific scientific users users Budget Budget (2019) (2019) ~~ 1200 1200 MCHF MCHF Member

Member States: States: Austria, Austria, Belgium, Belgium, Bulgaria, Bulgaria, Czech Czech Republic, Republic, Denmark, Denmark, Finland, Finland, France, France, Germany, Germany, Greece, Greece, Hungary, Hungary, Israel, Israel, Italy, Italy, Netherlands, Netherlands, Norway, Norway, Poland, Poland, Portugal, Portugal, Romania, Romania, Serbia,

Slovak Republic, Spain, Sweden, Switzerland and United Kingdom Serbia, Slovak Republic, Spain, Sweden, Switzerland and United Kingdom Associate Associate Members Members in in the the Pre-Stage Pre-Stage to to Membership: Membership: Cyprus, Cyprus, Slovenia Slovenia Associate Associate Member Member States:

States: India, India, Lithuania, Lithuania, Pakistan, Pakistan, Turkey, Turkey, Ukraine Ukraine Applications Applications for for Membership Membership or or Associate Associate Membership: Membership: Brazil, Brazil, Croatia, Croatia, Estonia Estonia Observers Observers to to Council: Council: Japan, Japan, Russia,

Russia, United United States States of of America; America; European European Union, Union, JINR JINR and and UNESCO UNESCO 4 Data at the Large Hadron Collider LHC experiments data: >300 PB Computing jobs on the WLCG Grid: using ~1M cores 5 Analytics Platform @CERN

- Big Data open source components - Integrated with domain-specific software and existing infrastructure - Users in: Physics, Accelerators, IT Experiments storage HDFS HEP software Personal storage 6 Hadoop and Spark Clusters at CERN Spark running on clusters: YARN/Hadoop Spark on Kubernetes

Accelerator logging (part of LHC infrastructure) Hadoop - YARN - 30 nodes (Cores - 1200, Mem - 13 TB, Storage 7.5 PB) General Purpose Hadoop - YARN, 65 nodes (Cores 2.2k, Mem 20 TB, Storage 12.5 PB) Cloud containers Kubernetes on Openstack VMs, Cores - 250, Mem 2 TB Storage: remote HDFS or EOS (for physics data) #UnifiedDataAnalytics #SparkAISummit 7 Text

Code Monitoring Sparkmonitor -> Jupyter extension for Spark monitoring, developed as a GSoC project with CERN. https://medium.com/@krishnanr/sp arkmonitor-big-data-tools-for-physic s-analysis-bbcdef68b35a Visualizations Performance Troubleshooting Goals: Improving productivity Reducing resource usage and cost Metrics: latency, throughput, cost How: Practice and methodologies Gather performance and workload data #UnifiedDataAnalytics #SparkAISummit

9 Performance Methodologies and Anti-Patterns 12 Typical benchmark graph Sound methodologies: http://www.brendangregg.com/methodology.html 10 Time (Minutes) Just a simple measurement No root-cause analysis Guesses and generalization Vendor A benchmark 8 6

4 System A is 5x faster! 2 0 #UnifiedDataAnalytics #SparkAISummit System A System B 10 Workload and Performance Data You want data to find answers to questions like

What is my workload doing? Where is it spending time? What are the bottlenecks (CPU, I/O)? How are systems resources used? Why do I measure the {latency/throughput} that I measure? Why is not 10x better? #EUdev2 11 Data + Context => Insights Workload monitoring data + Spark architecture knowledge Application Info on application

architecture Info on computing environment Agent takes produces: insights + actions #UnifiedDataAnalytics #SparkAISummit 12 Measuring Spark Distributed system, parallel architecture Many components, complexity increases when running at scale Execution hierarchy: SQL -> Jobs -> Stages -> Tasks Interaction with clusters and storage #UnifiedDataAnalytics #SparkAISummit 13

Spark Instrumentation - WebUI WebUI and History server: standard instrumentation Details on jobs, stages, tasks Default: http://driver_host:4040 Details on SQL execution and execution plans https://github.com/apache/spark/blob/master/docs/web-ui.md #UnifiedDataAnalytics #SparkAISummit 14 Spark Instrumentation Metrics Task metrics: Instrument resource usage by executor tasks:

Time spent executing tasks, Task CPU used, I/O metrics, Shuffle read/write details, .. SPARK-25170: https:// spark.apache.org/docs/latest/monitoring.html SQL metrics: DataFrame/SQL operations. Mostly used by Web UI SQL tab. See SPARK-28935 + Web-UI documentation #UnifiedDataAnalytics #SparkAISummit 15 How to Gather Spark Task Metrics Web UI exposes REST API Example: http://localhost:4040/api/v1/applications

History server reads from Event Log (JSON file) spark.eventLog.enabled=true spark.eventLog.dir = Programmatic interface via Spark Listeners sparkMeasure -> a tool and working example code of how to collect metrics with Spark Listeners #UnifiedDataAnalytics #SparkAISummit 16 Spark Metrics in REST API #UnifiedDataAnalytics #SparkAISummit 17 Task Metrics in the Event Log val df = spark.read.json("/var/log/spark-history/application_1567507314781_..") df.filter("Event='SparkListenerTaskEnd'").select("Task Metrics.*").printSchema |-- Disk Bytes Spilled: long (nullable = true)

|-- Executor CPU Time: long (nullable = true) |-- Executor Deserialize CPU Time: long (nullable = true) |-- Executor Deserialize Time: long (nullable = true) |-- Executor Run Time: long (nullable = true) |-- Input Metrics: struct (nullable = true) | |-- Bytes Read: long (nullable = true) | |-- Records Read: long (nullable = true) |-- JVM GC Time: long (nullable = true) |-- Memory Bytes Spilled: long (nullable = true) |-- Output Metrics: struct (nullable = true) | |-- Bytes Written: long (nullable = true) | |-- Records Written: long (nullable = true) |-- Result Serialization Time: long (nullable = true) |-- Result Size: long (nullable = true) |-- Shuffle Read Metrics: struct (nullable = true) | |-- Fetch Wait Time: long (nullable = true) |

|-- Local Blocks Fetched: long (nullable = true) | |-- Local Bytes Read: long (nullable = true) | |-- Remote Blocks Fetched: long (nullable = true) | |-- Remote Bytes Read: long (nullable = true) | |-- Remote Bytes Read To Disk: long (nullable = true) | |-- Total Records Read: long (nullable = true) |-- Shuffle Write Metrics: struct (nullable = true) | |-- Shuffle Bytes Written: long (nullable = true) | |-- Shuffle Records Written: long (nullable = true) | |-- Shuffle Write Time: long (nullable = true) |-- Updated Blocks: array (nullable = true) | |-- element: string (containsNull = true)

Spark Internal Task metrics: Provide info on executors activity: Run time, CPU time used, I/O metrics, JVM Garbage Collection, Shuffle activity, etc. #UnifiedDataAnalytics #SparkAISummit 18 Spark Listeners, @DeveloperApi Custom class, extends SparkListener Methods react on events to collect data, example: Attach custom Lister class to Spark Session --conf spark.extraListeners=..

19 SparkMeasure Architecture #UnifiedDataAnalytics #SparkAISummit 20 SparkMeasure Getting Started bin/spark-shell --packages ch.cern.sparkmeasure:sparkmeasure_2.11:0.15 val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)

stageMetrics.runAndMeasure(spark.sql(myQuery).show()) val myQuery = "select count(*) from range(1000) cross join range(1000) cross join range(1000)" #UnifiedDataAnalytics #SparkAISummit 21 SparkMeasure Output Example Scheduling mode = FIFO max(resultSize) => 17934 (17.0 KB) Spark Context default degree of parallelism = 8

sum(numUpdatedBlockStatuses) => 0 Aggregated Spark stage metrics: sum(diskBytesSpilled) => 0 (0 Bytes) numStages => 3 sum(memoryBytesSpilled) => 0 (0 Bytes)

sum(numTasks) => 17 max(peakExecutionMemory) => 0 elapsedTime => 9103 (9 s) sum(recordsRead) => 2000 sum(stageDuration) => 9027 (9 s) sum(bytesRead) => 0 (0 Bytes)

sum(executorRunTime) => 69238 (1.2 min) sum(recordsWritten) => 0 sum(executorCpuTime) => 68004 (1.1 min) sum(bytesWritten) => 0 (0 Bytes) sum(executorDeserializeTime) => 1031 (1 s)

sum(shuffleTotalBytesRead) => 472 (472 Bytes) sum(executorDeserializeCpuTime) => 151 (0.2 s) sum(shuffleTotalBlocksFetched) => 8 sum(resultSerializationTime) => 5 (5 ms) sum(shuffleLocalBlocksFetched) => 8 sum(jvmGCTime) => 64 (64 ms)

sum(shuffleRemoteBlocksFetched) => 0 sum(shuffleFetchWaitTime) => 0 (0 ms) sum(shuffleBytesWritten) => 472 (472 Bytes) sum(shuffleWriteTime) => 26 (26 ms) sum(shuffleRecordsWritten) => 8 #UnifiedDataAnalytics #SparkAISummit

22 SparkMeasure, Usage Modes Interactive: use from shell or notebooks Use to instrument your code Flight recorder mode Works with Jupyter notebooks, Azure, Colab, Databricks, etc. No changes needed to the code For Troubleshooting, for CI/CD pipelines,

Use with Scala, Python, Java https://github.com/LucaCanali/sparkMeasure #UnifiedDataAnalytics #SparkAISummit 23 Instrument Code with SparkMeasure https://github.com/LucaCanali/sparkMeasure/blob/master/docs/Instrument_Python_code.md #UnifiedDataAnalytics #SparkAISummit 24 SparkMeasure on Notebooks: Local Jupyter and Cloud Services https://github.com/LucaCanali/sparkMeasure/tree/master/examples #UnifiedDataAnalytics #SparkAISummit

25 SparkMeasure on Notebooks: Jupyter Magic: %%sparkmeasure (note, output truncated to fit in slide #UnifiedDataAnalytics #SparkAISummit 26 SparkMeasure as Flight Recorder Capture metrics and write to files when finished: Monitoring option: write to InfluxDB on the fly: #UnifiedDataAnalytics #SparkAISummit 27 Spark Metrics System Spark is also instrumented using the Dropwizard/Codahale metrics library

Multiple sources (data providers) Various instrumentation points in Spark code Including task metrics, scheduler, etc Instrumentation from the JVM Multiple sinks Graphite (InfluxDB), JMX, HTTP, CSV, etc #UnifiedDataAnalytics #SparkAISummit 28 Ingredients for a Spark Performance Dashboard Architecture Configure backend components

Know how the Dropwizard metrics system works Which Spark components are instrumented InfluxDB and Grafana Relevant Spark configuration parameters Dashboard graphs familiarize with available metrics InfluxDB query building for dashboard graphs #UnifiedDataAnalytics #SparkAISummit 29 Spark Performance Dashboard

#UnifiedDataAnalytics #SparkAISummit 30 Send Spark Metrics to InfluxDB Edit $SPARK_HOME/conf/metrics.properties Alternative: use the config parameters spark.metrics.conf.* $ SPARK_HOME/bin/spark-shell \ --conf "spark.metrics.conf.driver.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"\ --conf "spark.metrics.conf.executor.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink" --conf "spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_influxDB_hostName>" \

--conf "spark.metrics.conf.*.sink.graphite.port"= \ --conf "spark.metrics.conf.*.sink.graphite.period"=10 \ --conf "spark.metrics.conf.*.sink.graphite.unit"=seconds \ --conf "spark.metrics.conf.*.sink.graphite.prefix"="lucatest" \ --conf "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource" #UnifiedDataAnalytics #SparkAISummit

31 Assemble Dashboard Components Metrics written from Spark to InfluxDB Configuration of a Graphite endpoint in influxdb.conf Templates: how to ingest Spark metrics into InfluxDB series https://github.com/LucaCanali/Miscellaneous/tree/master/Spark_Dashboard Grafana graphs built using data queried from InfluxDB

Get started: Import an example dashboard definition Kubernetes users: a helm chart to automate config at: https://github.com/cerndb/spark-dashboard #UnifiedDataAnalytics #SparkAISummit 32 Grafana Dashboard Summaries Key metrics Graphs for drill-down analysis #UnifiedDataAnalytics #SparkAISummit 33 Spark Dashboard - Examples Graph: number of active tasks vs. time

Is Spark using all the available resources/cores? Are there time ranges with significant gaps? Identify possible issues: Long tails Stragglers Data skew #UnifiedDataAnalytics #SparkAISummit 34 Dashboard I/O metrics Graph: HDFS Read Throughput vs. time #UnifiedDataAnalytics #SparkAISummit

35 Dashboard Memory Graphs of JVM memory usage Heap Off-heap Executors and driver #UnifiedDataAnalytics #SparkAISummit 36 Dashboard Executor CPU Utilization Graph: CPU utilization by executors JVM vs. time

Total JVM CPU: CPU used by tasks CPU used by GC #UnifiedDataAnalytics #SparkAISummit 37 Task Time Drill Down, by Activity Graph: Task total run time + drill down by component: CPU, Wait time, Garbage collection, etc Investigation: CPU bound? Impact of GC I/O time? Other time? #UnifiedDataAnalytics #SparkAISummit 38 Graph Annotations Improvement:

Mark SQL/job/stage begin and end timestamps Implementation: SparkMeasure collects and writes query/jobs begin and end timestamps data to InfluxDB Grafana implements annotations #UnifiedDataAnalytics #SparkAISummit 39 Spark Dashboard, Lessons Learned Very useful to search for bottlenecks Many instrumented components Drilldown where time is spent Time evolution details Time series of N# active tasks, CPU, I/O, memory, etc Effort: you have to understand the root causes Use data to make and prove or disprove models

The instrumentation is still evolving example: I/O time is not measured directly, Python UDF, etc #UnifiedDataAnalytics #SparkAISummit 40 WIP: How to Measure I/O Time? Goal: How much of the workload time is spent doing I/O (reading)? Executor run time and wait time components Missing time instrumentation Apache Spark does not instrument I/O time Apache Hadoop Filesystem API does not measure I/O time Experimenting

Added I/O read time instrumentation for HDFS and S3A to sandbox Hadoop fork Exported the metrics using Spark Executor Plugins SPARK-28091 Green bars: measured HDFS read time #UnifiedDataAnalytics #SparkAISummit 41 Executor Plugins Extend Metrics User-defined executor metrics, SPARK-28091, target Spark 3.0.0 Example: add I/O metrics for s3a filesystem: /bin/spark-shell --jars /sparkexecutorplugins_2.12-0.1.jar \ --conf spark.executor.plugins=ch.cern.ExecutorPluginScala.S3AMetrics27 https://github.com/cerndb/SparkExecutorPlugins 42

Metrics from OS Monitoring Very useful also to collect OS-based metrics Hadoop: dashboard with HDFS and YARN metrics OS host metrics: Collectd, Ganglia Kubernetes: Prometheus-based monitoring and dashboard #UnifiedDataAnalytics #SparkAISummit 43 Notable JIRAs about Metrics Documentation improvements Spark monitoring Master, SPARK-23206 Additional Memory Tuning Metrics

Master, SPARK-29064 Add Prometheus endpoint for executor metrics WIP, SPARK-27189 Add memory usage metrics to the metrics system Master, SPARK-28091 Extend Spark metrics system with userdefined metrics using executor plugins Master, SPARK-28475 Add regex MetricFilter to GraphiteSink CPU time used by JVM: Master, SPARK-26890, Add Dropwizard metrics list and configuration details Spark 2.4.0, SPARK-25170: Add Task Metrics description to the documentation Spark 2.4.0: SPARK-25228 Add executor CPU Time metric Master: SPARK-26928, Add driver CPU Time to the metrics system,

Spark 2.3.0: SPARK-22190 Add Spark executor task metrics to Dropwizard metrics #UnifiedDataAnalytics #SparkAISummit 44 Conclusions Performance troubleshooting by understanding Spark architecture + Spark instrumentation, Web UI Spark Task metrics + Listeners, tool: sparkMeasure Spark Metrics System, tool: Grafana dashboard Instrumentation + Context Insights + Actions

Contribute, adopt, share Instrumentation in Spark ecosystem keeps improving Solid methodologies and tools are key Share your results, tools, issues, dashboards.. #UnifiedDataAnalytics #SparkAISummit 45 Acknowledgements Colleagues at CERN Thanks to Apache Spark committers and community

Hadoop and Spark service, in particular Prasanth Kothuri and Riccardo Castellotti Help with JIRAs and PRs References: https://github.com/LucaCanali/sparkmeasure https:// db-blog.web.cern.ch/blog/luca-canali/2019-02-performance-d ashboard-apache-spark #UnifiedDataAnalytics #SparkAISummit 46 DONT FORGET TO RATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT

Recently Viewed Presentations

  • Are We the Cooperative We Think We Are?

    Are We the Cooperative We Think We Are?

    Customer Owners, Walking the TalkFor it to work, we have to sell it. The proof in the pudding about a Cooperative having a competitive advantage, comes from the spirit of its customer owners, and their commitment to the possibilities
  • Algorithms Design & Analysis Lecture 2 Growth of

    Algorithms Design & Analysis Lecture 2 Growth of

    Asymptotic notation. It is a way to describe the characteristics of a function in the limit. It describes the rate of growth of functions. Focus on what's important by abstracting away low-order terms and constant factors. It is a way...
  • Analysis of Algorithms

    Analysis of Algorithms

    Queues. The Queue ADT. The Queue ADT stores arbitrary objects. Insertions and deletions follow the first-in first-out scheme (FIFO) Insertions are at the rear of the queue and removals are at the front of the queue
  • Grand Canyon Reader Award - Kyrene School District

    Grand Canyon Reader Award - Kyrene School District

    Author: Tom McNeal. ... Based on a true story. Based on the life of Jack Gruener, this book relates his story of survival from the Nazi occupation of Krakow, when he was eleven, through a succession of concentration camps, to...
  • Extension Dicamba Update 2019 purdue.edu/extension Extension OISC Annual

    Extension Dicamba Update 2019 purdue.edu/extension Extension OISC Annual

    BASF, Bayer, and Corteva will provide trainings. Applications up to R1 (flowering) or 45 days after planting. Application hours from 1 hour after sunrise until 2 hours before sunset. There were a few subtle changes on the label for this...
  • Chapter 9: Covalent Bonding

    Chapter 9: Covalent Bonding

    (mostly Chapter 9) ... <0.2 Atom Number of Valence Electrons Number of Bonding Electrons Bonding Capacity Carbon Nitrogen Oxygen Halogens Hydrogen Bonding Capacity Electronegativity Table Drawing Lewis Dot Structures Count the valence electrons. Predict the location of the atoms Hydrogen...
  • Detailed Design Review - Rochester Institute of Technology

    Detailed Design Review - Rochester Institute of Technology

    Shelves can be adjusted in 1 inch increments. Not bolted to the floor so they can be rearranged. Accessible on both sides. Layout in B&OS. Rack Quote 1: 6 racks - 5 shelves. ... Detailed Design Review Last modified by:...
  • High Performance Computing 811 - smu.ca

    High Performance Computing 811 - smu.ca

    Computational Methods in Physics PHYS 3437 Dr Rob Thacker Dept of Astronomy & Physics (MM-301C) [email protected] Today's Lecture Use MC techniques to simulate a random walk A few slides on random number generators Mobility of molecules Consider a single molecule...