Data/Bigdata

Apache Spark 기능

안녕하세요 씨앤텍시스템즈입니다.

 

이번 포스팅은 이전 포스팅인  Apache Spark란?에 이어서 Spark 기능에 대해서 살펴보겠습니다.

 


Apache Spark의 다양한 기능 중 메인 기능인 아래 3가지를 주로 살펴보겠습니다.

▷ Spark SQL / Spark DataFrame

▷ Spark Streaming

▷ Spark ML

 

 

1. Spark SQL / DataFrame

 먼저 DataFrame은 Spark에서 데이터를  Excel과 같은 스프레드시트 형태로 로드하여 처리 할 수 있는 기능입니다. 흔히, Python의 Pandas패키지의 DataFrame과 R에서의 DataFrame와 동일한 형태와 처리방법이라고 생각 할 수 있습니다.

다양한 데이터 형식(JSON, CSV, TEXT 등)데이터저장소(Local, HDFS, HBase, AWS 등)에서 데이터를 로드하여 DataFrame으로 생성 할 수 있고, 데이터 처리에 필요한 다양한 서브 함수를 제공합니다.

 

<DataFrame 예시>

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

 Spark SQL은 Spark DataFrame으로 생성된 데이터를 Ansi-SQL의 형태로 질의를 할 수 있는 기능입니다.  아래와 같이 TempView로 DataFrame을 등록하여 spark.sql() 함수를 이용하여 데이터를 SQL로 처리 할 수 있습니다.

 

<Spark SQL 예시>

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

 

2. Spark Streaming

 Spark에서 실시간 처리를 처리 할 수 있는 Streaming 기능을 제공합니다. 즉, 지속적으로 유입되는 데이터에 대해 유연하게 대처 할 수 있게 RDD기반으로 처리 할 수 있습니다.

 또한, Spark 2에서는 Structured Streaming이라는 새로운 Streaming 방식을 지원하는데, 데이터를 RDD 기반이 아닌 DataFrame 형식으로 Streaming 데이터를 처리 할 수 있게 지원하는데 이는 DataFrame의 기능과 SQL을 동시에 사용하여 처리 할 수 있음을 말합니다.

 

<Spark Structured Streaming 예시>

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

 

3. Spark ML

 Spark에서는 데이터과학에서는 필수인 Machine Learning 기능도 제공하고 있습니다. Mllib과 ML이라는 2가지 기능을 제공하고 있으며, Mllib은 RDD기반/ML은 DataFrame 기반으로 처리 할 수 있습니다.

 지도학습 알고리즘(회귀, 분류)과 비지도 학습 알고리즘(군집 등)을 지원하며, 데이터 입력 시에는 Spark에서 지원하는 Vector형식으로 변형하여 사용합니다.

 

<Linear Regression 예시>

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

 

<RandomForest 예시>

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

 


이상으로 Spark의 기능에 대해서 전반적으로 살펴 보았습니다.

빅데이터 기술이라면 절대 빼놓지 않고 사용되는 Spark인 만큼, 사용해 보시길 권장드립니다.

 

감사합니다.

728x90

'Data > Bigdata' 카테고리의 다른 글

Spark DataFrame (PySpark)  (0) 2020.04.20
R을 이용한 Bioinformatics (Bioconductor)  (1) 2020.04.20
Elastic Search란?  (0) 2020.01.20
Apache Spark란?  (0) 2020.01.09
R을 활용한 빅데이터 처리  (2) 2020.01.02