BE <java>/hadoop + spark

pyspark 입문 + 정리

stand 2023. 12. 11. 14:43

 

- pyspark ?

apache에서 지원하는 데이터 처리 기술로 python 언어가 기반인 cluster framework이다 cluster framework는 여러 서버로 연결하여 하나의 시스템으로 동작하는 집합체를 의미한다 처리 속도와 확장성을 보여주며 대규모 데이터를 처리하는 과정을 가진다 이는 master-node / slave-node 구조와 대용량 관리 서버와 manager로서 resource관리, 스케줄링 관리 로 구성되어진 구조이다 대표적으로 apache의 hadoop이 유사한 서비스로 구성된 구조이고 java 언어로 기반하며 여러 api를 통해 구현하고 여러 오픈소스를 통해 확장한 architecture로 알려져 있다 spark는 분산 처리를 지원하는 기술이고 youtube와 같은서비스를 실시간으로 지원하는 data streaming을 가진 기술은 Flink가 있다

 

-분산 처리

데이터를 각 node에서 처리하고 결과를 병합하는 구조를 가지는데 이를 data-parallel 이라 불린다 여러 node는 RDD에 정의되는데 RDD는 분산 collection으로 array/list 구조를 가지고 분산처리를 수행하는 데이터 구조이다  RDD는 변환을 통해 새로운 RDD(resilient distribute data)를 처리하는데 narrow transformation / wide transformation 의 2가지 유형의 변환방법을 제시하고 있으며 새로운 RDD는 변환처리 전 같은 요소의 RDD와 map / flatmap / zip / fillter 를 처리하는 방식과 변환 전 RDD에서 다른 RDD와 같은 요소를 함께 처리해서 key-value 구조를 가지는 RDD 방식으로 구분한다 후자의 처리는 독립partion 단위로 분산 처리를 수행하기 때문에 shuffle이라고 불려진다 cluster 환경에서 spark는 RDD 생성과 연산처리를 하기 위한 resource 요구를 hadoop의 yarn 체계가 대표적으로 처리하고 있으며 각각의 slave들은 driver programe이 제어하는 것에 진행된다

 

driver가 구동되면 RDD의 생성은 task 단위로 각각의 node 안에서 patiton 단위로 분산되어 처리하는데 RDD는 instance로 처리되어 cluster에서는 instance정보로 RDD 본 정보 / 변환 전 RDD 정보, 변환 후 RDD 요소 type, 변환 후 RDD 파티션 수, RDD 생성에 load되어야 하는 형태, shuffle 발생, RDD 영속성 처리 이렇게 정보를 모으고 나서 스케줄링에 변화를 가지는데 RDD에 처리 단위로 전과 후의 partition은 해당 정보를 위해 별개로 처리하는 것이고 두 RDD 분리에 대한 network 상태도 요구되고 있다 또한 RDD는 영속성, immutable 상태에 있어야 하며 장애가 발생 시 장애 경로의 RDD를 전부 버리고 동일한 처리를 요구한다 RDD는 연산이 요청되었다면 metadata로서 처리하는 것이고 명령어들을 계산할지 textfile() 구조로 코드를 구성한다고 생각할 수 있다 이것이 Lazy Evaluation 의 성질로서 변환 처리가 이뤄지기 전 까지 호출하는 명령어 전까지는 연산의 구조만을 지닌 metadata라는 것이다

 

- key-value RDD

metadata는 pairs RDD로 본 RDD를 map하여 새로운 RDD를 생성한다면 값, list 등 처리가 가능하고 key와 value로 처리를 수행하며 reduceByKey, groupByKey, sortByKey로 Reduction 연산을 가진다 *(중복값을 더해서 새로운 값으로)

이러한 개념은 shuffling에서도 partion이 이동하여 중복의 값을 새로운 값으로 저장하는 구조로 이어진다 여기서 확장한 개념이 wide transformation, narrow transformation이다 메모리 사용을 최대한 하면서 resource 활용을 합리적으로 사용하지만 partition 이동은 결국엔 반복작업으로 이어지므로 task위치에서 작업을 수행하는 persist로 올린다 (storage LV)

spark를 기동하기위해서 sparkcontext를 활성한다면 slave-node에 task는 작업을 할당받아 action과 transformation을 수행하는 구조이다 RDD transformation 연산에서는 map으로 반환값을 찾고 action 연산에서는 reduce로 cluster에 작업을 할당받는 것이다 반환된 결과는 spark-driver가 받게된다 map(lambda :)에 의해 지정된 인수들은 list[0] + list[1].... 로 reduce작업이 수행되고 partion으로 task는 연산의 식을 미리 가지고 있기 떄문에 수행 명령이 떨어지면 곧바로 결과를 호출하는 것이다

 

각 slave-node는 daemon에 따라 resource 할당을 요구하는데 partition들은 task와 executor를 수행하면서 lifecycle이 시작된다 driver는 호출된 결과를 받게되면서 resource를 요구하는데 slave는 각각의 상태를 확인하고 수행한 결과에 따라서 전송받은 driver가 cluster manager에 상태를 확인하는 요청으로 종료된다 대화형 체계에서 sparksession을 사용하고 RDD 연결을 확인하는 sparkcontext를 이용한다 main process가 활성되면 resource와 aplication 설정에 접근하고 cluster와 연결및 RDD를 생성하게 된다 이떄 RDD에는 연산의 형태만을 가진 상태로 Lazy Evaluation를 가지다가 transformation이 수행되되면 rdd.map(lambda :) 와 mapred_rdd.reduceByKey(lamabda : )을 수행하고 action으로 driver에게 result를 보내게 된다 따라서 transformation에 반응하는 함수는 map(), flatmap(), fillter(), distinct(), reduceByKey(), groupByKey(), mapValues(), flatMapValues(), sortByKey() 가 있으며 action에 반응하는 함수는 collect(), count(), countByValue(), take(), top(), reduce(), fold(), foreach() 가 있다

 

spark에는 분산환경에서 열 데이터의 partition에 조작하기 위해 narrow transformation이 독립적인 처리를 진행하는 방법으로 map(), filter(), union(), sample() 등이 있으며 result RDD에 접근해서 다른 partition에 접근하는 wide transformation 방법으로 재분배로 이해하고 필요에 따라선 cluster 이동도 가능하고 join(), reduceByKey, groupByKey가 있으며 이 방법을 shuffling이라 불려진다 

 

- key-value reduction action

reduce(), fold(), groupBy(), aggregate()는 mapreduce 에서 분산 처리를 수행하는 방법으로 연관있는 값으로 결과를 만들며 transformation로 groupByKey,  reduceByKey, mapValues, key, join, leftOuterjoin, rightOuterjoin 등을 수행하고 action으로 countByKey를 수행한다 RDD는 executor에 task에 따라 결과를 모아 driver에 보내지고 병렬구조에서 data가 중복이 없는 group으로 작업을 수행하고 groupByKey 메서드 처럼 key를 통해 새로운 RDD를 뽑아내는 작업을 가지는 것이다

 

- persist & driver + window(OS)

shuffling이 일어나는 join, leftOuterJoin, rightOuterJoin, groupByKey 등 작업은 반드시 많은 부하를 일으키게 되는데 groupByKey를 수행하면서 reduceByKey를 수행하면 그룹화를 반복으로 수행하면서 key값에 중요도를 높이게 된다 이는 shuffle을 2번 일으키는 작업이 되며 partition에 효율적이지 못한 작업인 것이다 따라서 지향해야할 목적은 최대한 task-task에서 In-Memory를 구축하고 Lazy를 이용하는 것이고 직렬화, RAM+DISK resource 이용 등 한정된 자원에서 효율성을 고려해야 한다 동시에 query가 되는 구조로 RDD가 정확히 partition으로 존재해야 한다 partition 크기를 설정하는 것은 처리과정에서 성능을 보여주는 것으로 shuffle partition, hash partition, range partition 등이 있다 data 요소 정보를 확인하고 partition 크기를 결정하는 것이 정확한 방법이며 쏠리는 현상(skew)을 최소화하기 위해 query 최적화, partition 증대 등 방법을 찾아야 한다 또한 join, ~groupBy 과 같이 core의 처리량은 partition의 2배~3배를 잡아야 하고 partition size는 300mb를 최대한 넘지 않는 크기로 지향한다

 

C:에 따라 연산이 시작되면 cpu에 접근하고 L1 cache L2 cache L3 cache ram hdd/sdd 순서로 접근하는데 처리해야 할 데이터가 많아지면 자연스럽게 속도는 느려질 것이며 따라서 hadoop의 mapreduce 체계는 분산 구조로서 여러 메모리에 접근하면 master가 처리하고 slave가 연산을 수행을 하고 반환 시키는 것이다 또한 task로 cluster 단위 해결하는 spark는 하나의 파일처럼 연산 결과를 보여주고 immutable로 정의하기 때문에 떠오를 수 있었던 기술이다

 

분산 처리의 실패는 통신 실패의 문제가 있는데 처리 순서로 map과 reduceByKey 로 진행하는 건 맞지만 서버에 부하가 걸리지 않도록 sortByKey, coalesce 등 partition에 처리 용량을 줄이던가 key에 접근을 새로운 RDD로 생성해서 구현한다

 

RDD를 사용하는 경우 api를 통해 현실 데이터를 논리 세계에서 적용되도록 배치를 진행해야 한다 분산 환경이라면 action 연산을 수행하는 곳과 transformation을 수행하는 곳 분리된 환경으로 저수준의 기술을 통해 partition 개념이 이용된다 사용자의 스케줄링이 메모리 resource를 파악하고 효율적인 수행을 위해 task 단위로 node를 병렬처리 한다

저수준 api의 방법으로 dataframe 과 key-value 방법이 있으며 dataframe은 row 타입으로 객체 RDD를 생성하고 collection에 java format을 이용해 객체 초기화를 진행하며 연산을 가질 수 있다 spark는 논리세계로 넘겨주기 위해 현실 데이터를 dataframe으로 변환한 것이고 cluster로 RDD 처리를 진행하는 것이다 그래서 저수준 api로 sparkContext로 parallelize를 호출하여 partition 개념으로 접근, 사용자의 patitioning이 가능하다 이 과정에서 skew(데이터 몰림 현상)이 발생되고 구조걱인 형태(구조적 api, 추상화된 데이터)는 여러 key를 가지고 python의 dataframe으로 접근하며 spark는 논리 세계에서 물리적 실행을 위해 직렬화를 자동으로 가진다 spark cluster에서 실행되는 thread로 executor는 driver에 각 node의 result를 보내주는데 분산 구조에서 데이터 크기가 크다면 고수준의 partitioning을 가져야 resource의 효율성을 가질 수 있는데 저수준에서 최대한 수행할 수 있는 방법은 분산형 공유 변수를 이용한다

 

 

'BE <java> > hadoop + spark' 카테고리의 다른 글

hadoop study2 (+hive +sqoop +pig)  (0) 2023.12.13
hadoop(ubuntu) study  (0) 2023.12.09