안녕하세요. (주)씨앤텍시스템즈입니다.
이번에 Spark를 이용하여 DataFrame을 SparkSQL로 데이터 처리하는 방법을 알아봅니다.
SQL(Structured Query Language)
- 데이터베이스 상에서 저장되어 있는 데이터에 대해 질문을 정의하고 표현하는데 가장 일반적으로 사용되는 질의언어
- 관계형 데이터베이스 관리 시스템의 데이터를 관리하기 위해 설계된 특수 목적의 프로그래밍 언어
- 데이터베이스 관리자 또는 많은 데이터 분석가는 복잡한 비즈니스 문제를 데이터 조작 언어(DML, Data Manipulation Laguage)로 조회
Spark SQL
- 구조화 된 데이터 처리를 위한 Spark 모듈
- 기본 Spark RDD API와 달리 Spark SQL에서는 데이터의 구조와 수행중인 계산에 대한 자세한 정보를 제공
- 내부적으로 질의 최적화를 수행하여 사용자가 특별한 변경없이 최고의 성능을 낼 수 있도록 제공
- SQL 및 Dataset API를 포함하여 Spark SQL과 상호 작용하는 방법
- HiveQL 호환을 통한 Hadoop과의 통합 분석을 수행 가능
- S3나 타 저장소에 저장된 데이터에 대해 하이브 메타스토어 및 객채 정의와 함께 사용 할 수 있도록 설계
- Spark SQL은 빅데이터 저장 관련하여 관계형 액세스에 최적화 되어 있는 Parquet 형식의 파일을 기본적으로 지원하여 데이터의 접근성 또한 우수
Spark SQL 예시 1
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
Spark Dataframe에서 SQL과 관련된 select, filter, groupBy 와 같은 하위함수를 사용하여 조작 할 수 있지만 보통 사용하는 DML 형식으로 사용하기 위해서는 아래와 같이 createOrReplaceTempView, createGlobalTempView 등을 이용하여 Spark Session 상에서 특정 테이블 이름으로 등록해주면 사용 가능하다.
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
Spark SQL 예시 2
- Spark Data Frame을 Database Table 처럼 사용
In [1]:
import pandas as pd
In [2]:
pandf = pd.read_csv("data/Uber-Jan-Feb-FOIL.csv", header=0)
In [3]:
pandf.head()
Out[3]:
Spark Session 데이터 프레임 생성¶
In [1]:
uberDF = spark.read.csv("data/Uber-Jan-Feb-FOIL.csv", inferSchema=True, header=True)
In [5]:
spark.read.format("csv").load("data/Uber-Jan-Feb-FOIL.csv").show()
In [3]:
uberDF.createOrReplaceTempView("uber")
Spark SQL SELECT¶
In [4]:
spark_select = spark.sql("select * from uber limit 10").show()
SELECT column limit¶
In [9]:
spark.sql("select date, dispatching_base_number from uber limit 10").show()
SELECT DISTINCT¶
In [7]:
spark.sql("select distinct dispatching_base_number from uber").show()
WHERE¶
In [8]:
spark.sql("SELECT count(*) from uber where trips > 2000").show()
distinct, sum, group by, order by¶
In [11]:
spark.sql(""" select distinct dispatching_base_number, sum(trips) tripsum
from uber
group by dispatching_base_number
order by tripsum desc
""").show()
In [13]:
spark.sql(""" select distinct date, sum(trips) tripsum
from uber
group by date
order by tripsum desc
limit 10
""").show()
between¶
In [15]:
#범위
spark.sql("select * from uber where trips between 1000 and 2000 limit 10").show()
In [ ]:
참고 - https://spark.apache.org/docs/latest/sql-getting-started.html
728x90
'Data > Bigdata' 카테고리의 다른 글
Selenium을 이용한 인스타그램 크롤링 (3) | 2020.06.11 |
---|---|
Spark을 이용한 Deeplearning (0) | 2020.06.11 |
Spark DataFrame (PySpark) (0) | 2020.04.20 |
R을 이용한 Bioinformatics (Bioconductor) (1) | 2020.04.20 |
Apache Spark 기능 (0) | 2020.02.13 |