안녕하세요 씨앤텍시스템즈입니다.
이번 포스팅은 이전 포스팅인 Apache Spark란?에 이어서 Spark 기능에 대해서 살펴보겠습니다.
Apache Spark의 다양한 기능 중 메인 기능인 아래 3가지를 주로 살펴보겠습니다.
▷ Spark SQL / Spark DataFrame
▷ Spark Streaming
▷ Spark ML
1. Spark SQL / DataFrame
먼저 DataFrame은 Spark에서 데이터를 Excel과 같은 스프레드시트 형태로 로드하여 처리 할 수 있는 기능입니다. 흔히, Python의 Pandas패키지의 DataFrame과 R에서의 DataFrame와 동일한 형태와 처리방법이라고 생각 할 수 있습니다.
다양한 데이터 형식(JSON, CSV, TEXT 등)과 데이터저장소(Local, HDFS, HBase, AWS 등)에서 데이터를 로드하여 DataFrame으로 생성 할 수 있고, 데이터 처리에 필요한 다양한 서브 함수를 제공합니다.
<DataFrame 예시>
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
Spark SQL은 Spark DataFrame으로 생성된 데이터를 Ansi-SQL의 형태로 질의를 할 수 있는 기능입니다. 아래와 같이 TempView로 DataFrame을 등록하여 spark.sql() 함수를 이용하여 데이터를 SQL로 처리 할 수 있습니다.
<Spark SQL 예시>
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
2. Spark Streaming
Spark에서 실시간 처리를 처리 할 수 있는 Streaming 기능을 제공합니다. 즉, 지속적으로 유입되는 데이터에 대해 유연하게 대처 할 수 있게 RDD기반으로 처리 할 수 있습니다.
또한, Spark 2에서는 Structured Streaming이라는 새로운 Streaming 방식을 지원하는데, 데이터를 RDD 기반이 아닌 DataFrame 형식으로 Streaming 데이터를 처리 할 수 있게 지원하는데 이는 DataFrame의 기능과 SQL을 동시에 사용하여 처리 할 수 있음을 말합니다.
<Spark Structured Streaming 예시>
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
3. Spark ML
Spark에서는 데이터과학에서는 필수인 Machine Learning 기능도 제공하고 있습니다. Mllib과 ML이라는 2가지 기능을 제공하고 있으며, Mllib은 RDD기반/ML은 DataFrame 기반으로 처리 할 수 있습니다.
지도학습 알고리즘(회귀, 분류)과 비지도 학습 알고리즘(군집 등)을 지원하며, 데이터 입력 시에는 Spark에서 지원하는 Vector형식으로 변형하여 사용합니다.
<Linear Regression 예시>
from pyspark.ml.regression import LinearRegression
# Load training data
training = spark.read.format("libsvm")\
.load("data/mllib/sample_linear_regression_data.txt")
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
<RandomForest 예시>
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # summary only
이상으로 Spark의 기능에 대해서 전반적으로 살펴 보았습니다.
빅데이터 기술이라면 절대 빼놓지 않고 사용되는 Spark인 만큼, 사용해 보시길 권장드립니다.
감사합니다.
'Data > Bigdata' 카테고리의 다른 글
Spark DataFrame (PySpark) (0) | 2020.04.20 |
---|---|
R을 이용한 Bioinformatics (Bioconductor) (1) | 2020.04.20 |
Elastic Search란? (0) | 2020.01.20 |
Apache Spark란? (0) | 2020.01.09 |
R을 활용한 빅데이터 처리 (2) | 2020.01.02 |