Data/Bigdata

Spark SQL(Pyspark)

안녕하세요. (주)씨앤텍시스템즈입니다.

 

이번에 Spark를 이용하여 DataFrame을 SparkSQL로 데이터 처리하는 방법을 알아봅니다.


SQL(Structured Query Language)

  • 데이터베이스 상에서 저장되어 있는 데이터에 대해 질문을 정의하고 표현하는데 가장 일반적으로 사용되는 질의언어
  • 관계형 데이터베이스 관리 시스템의 데이터를 관리하기 위해 설계된 특수 목적의 프로그래밍 언어
  • 데이터베이스 관리자 또는 많은 데이터 분석가는 복잡한 비즈니스 문제를 데이터 조작 언어(DML, Data Manipulation Laguage)로 조회

Spark SQL

  • 구조화 된 데이터 처리를 위한 Spark 모듈
  • 기본 Spark RDD API와 달리 Spark SQL에서는 데이터의 구조와 수행중인 계산에 대한 자세한 정보를 제공
  • 내부적으로 질의 최적화를 수행하여 사용자가 특별한 변경없이 최고의 성능을 낼 수 있도록 제공
  • SQL 및 Dataset API를 포함하여 Spark SQL과 상호 작용하는 방법
    • HiveQL 호환을 통한 Hadoop과의 통합 분석을 수행 가능
    • S3나 타 저장소에 저장된 데이터에 대해 하이브 메타스토어 및 객채 정의와 함께 사용 할 수 있도록 설계
  • Spark SQL은 빅데이터 저장 관련하여 관계형 액세스에 최적화 되어 있는 Parquet 형식의 파일을 기본적으로 지원하여 데이터의 접근성 또한 우수

Spark SQL 예시 1

# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

 Spark Dataframe에서 SQL과 관련된 select, filter, groupBy 와 같은 하위함수를 사용하여 조작 할 수 있지만 보통 사용하는 DML 형식으로 사용하기 위해서는 아래와 같이 createOrReplaceTempView, createGlobalTempView 등을 이용하여 Spark Session 상에서 특정 테이블 이름으로 등록해주면 사용 가능하다.

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

Spark SQL 예시 2

 
  • Spark Data Frame을 Database Table 처럼 사용
In [1]:
import pandas as pd
In [2]:
pandf = pd.read_csv("data/Uber-Jan-Feb-FOIL.csv", header=0)
In [3]:
pandf.head()
Out[3]:
  dispatching_base_number date active_vehicles trips
0 B02512 1/1/2015 190 1132
1 B02765 1/1/2015 225 1765
2 B02764 1/1/2015 3427 29421
3 B02682 1/1/2015 945 7679
4 B02617 1/1/2015 1228 9537
 

Spark Session 데이터 프레임 생성

In [1]:
uberDF = spark.read.csv("data/Uber-Jan-Feb-FOIL.csv", inferSchema=True, header=True)
In [5]:
spark.read.format("csv").load("data/Uber-Jan-Feb-FOIL.csv").show()
 
+--------------------+--------+---------------+-----+
|                 _c0|     _c1|            _c2|  _c3|
+--------------------+--------+---------------+-----+
|dispatching_base_...|    date|active_vehicles|trips|
|              B02512|1/1/2015|            190| 1132|
|              B02765|1/1/2015|            225| 1765|
|              B02764|1/1/2015|           3427|29421|
|              B02682|1/1/2015|            945| 7679|
|              B02617|1/1/2015|           1228| 9537|
|              B02598|1/1/2015|            870| 6903|
|              B02598|1/2/2015|            785| 4768|
|              B02617|1/2/2015|           1137| 7065|
|              B02512|1/2/2015|            175|  875|
|              B02682|1/2/2015|            890| 5506|
|              B02765|1/2/2015|            196| 1001|
|              B02764|1/2/2015|           3147|19974|
|              B02765|1/3/2015|            201| 1526|
|              B02617|1/3/2015|           1188|10664|
|              B02598|1/3/2015|            818| 7432|
|              B02682|1/3/2015|            915| 8010|
|              B02512|1/3/2015|            173| 1088|
|              B02764|1/3/2015|           3215|29729|
|              B02512|1/4/2015|            147|  791|
+--------------------+--------+---------------+-----+
only showing top 20 rows

In [3]:
    uberDF.createOrReplaceTempView("uber")
 

Spark SQL SELECT

In [4]:
spark_select = spark.sql("select * from uber limit 10").show()
 
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02764|1/1/2015|           3427|29421|
|                 B02682|1/1/2015|            945| 7679|
|                 B02617|1/1/2015|           1228| 9537|
|                 B02598|1/1/2015|            870| 6903|
|                 B02598|1/2/2015|            785| 4768|
|                 B02617|1/2/2015|           1137| 7065|
|                 B02512|1/2/2015|            175|  875|
|                 B02682|1/2/2015|            890| 5506|
+-----------------------+--------+---------------+-----+

 

SELECT column limit

In [9]:
spark.sql("select date, dispatching_base_number from uber limit 10").show()
 
+--------+-----------------------+
|    date|dispatching_base_number|
+--------+-----------------------+
|1/1/2015|                 B02512|
|1/1/2015|                 B02765|
|1/1/2015|                 B02764|
|1/1/2015|                 B02682|
|1/1/2015|                 B02617|
|1/1/2015|                 B02598|
|1/2/2015|                 B02598|
|1/2/2015|                 B02617|
|1/2/2015|                 B02512|
|1/2/2015|                 B02682|
+--------+-----------------------+

 

SELECT DISTINCT

In [7]:
spark.sql("select distinct dispatching_base_number from uber").show()
 
+-----------------------+
|dispatching_base_number|
+-----------------------+
|                 B02512|
|                 B02598|
|                 B02682|
|                 B02765|
|                 B02617|
|                 B02764|
+-----------------------+

 

WHERE

In [8]:
spark.sql("SELECT count(*) from uber where trips > 2000").show()
 
+--------+
|count(1)|
+--------+
|     284|
+--------+

 

distinct, sum, group by, order by

In [11]:
spark.sql(""" select distinct dispatching_base_number, sum(trips) tripsum
              from uber
              group by dispatching_base_number
              order by tripsum desc
          """).show()
 
+-----------------------+-------+
|dispatching_base_number|tripsum|
+-----------------------+-------+
|                 B02764|1914449|
|                 B02617| 725025|
|                 B02682| 662509|
|                 B02598| 540791|
|                 B02765| 193670|
|                 B02512|  93786|
+-----------------------+-------+

In [13]:
spark.sql(""" select distinct date, sum(trips) tripsum
              from uber
              group by date
              order by tripsum desc
              limit 10
          """).show()
 
+---------+-------+
|     date|tripsum|
+---------+-------+
|2/20/2015| 100915|
|2/14/2015| 100345|
|2/21/2015|  98380|
|2/13/2015|  98024|
|1/31/2015|  92257|
|2/15/2015|  89401|
|2/27/2015|  88806|
|2/19/2015|  88757|
|2/28/2015|  88181|
| 2/6/2015|  85940|
+---------+-------+

 

between

In [15]:
#범위
spark.sql("select * from uber where trips between 1000 and 2000 limit 10").show()
 
+-----------------------+--------+---------------+-----+
|dispatching_base_number|    date|active_vehicles|trips|
+-----------------------+--------+---------------+-----+
|                 B02512|1/1/2015|            190| 1132|
|                 B02765|1/1/2015|            225| 1765|
|                 B02765|1/2/2015|            196| 1001|
|                 B02765|1/3/2015|            201| 1526|
|                 B02512|1/3/2015|            173| 1088|
|                 B02765|1/5/2015|            227| 1133|
|                 B02765|1/6/2015|            234| 1376|
|                 B02512|1/6/2015|            218| 1314|
|                 B02765|1/7/2015|            248| 1704|
|                 B02512|1/7/2015|            217| 1446|
+-----------------------+--------+---------------+-----+

In [ ]:
 
 
 

참고 - https://spark.apache.org/docs/latest/sql-getting-started.html

 

Getting Started - Spark 2.4.5 Documentation

You are using an outdated browser. Upgrade your browser today or install Google Chrome Frame to better experience this site. Overview Programming Guides API Docs Deploying More v2.4.5 -->

spark.apache.org

 

728x90

'Data > Bigdata' 카테고리의 다른 글

Selenium을 이용한 인스타그램 크롤링  (3) 2020.06.11
Spark을 이용한 Deeplearning  (0) 2020.06.11
Spark DataFrame (PySpark)  (0) 2020.04.20
R을 이용한 Bioinformatics (Bioconductor)  (1) 2020.04.20
Apache Spark 기능  (0) 2020.02.13