오늘날의 어플리케이션에서는 데이터가 핵심임:

  • 데이터의 양이 많아진다
  • 데이터가 복잡하다.
  • 데이터가 빠르게 변화한다

 

 

데이터 중심 어플리케이션의 제일 중요한 3요소:

  • 신뢰성
  • 확장성
  • 유지보수성

 

 

어플리케이션의 3가지 유형:

  • 온라인 API 서버: Latency 와 가용성이 특히 중요.
  • 배치 시스템: Performance 와 Scalibilty 가 중요.
  • 스트림 시스템: 신뢰성과 Performance 와 확장성이 중요.

 

 

대규모 데이터 처리를 위한 맵리듀스:

  • 병렬 처리 매커니즘임.
  • 확장성이 있고 내결함성이 있는게 주요 특징
  • 대규모 일괄 처리 시스템에서는 맵리듀스가 특히 중요했지만 이제는 좀 다르다고 함. (그것을 뛰어넘는 기술들이 등장. 이후에 소개) 

 

 

Batch Processing 의 목적:

  • OLTP 와 같이 소량의 데이터를 보고 결과를 내는 목적은 아님.
  • Batch Processing 을 하는 대표적인 방법이 MapReduce
  • 대량의 데이터를 어떠한 분석 목적으로 처리해서 비즈니스 결정에 도움이 되는 레포트를 만드는 것과 같을 것.

 

 

HDFS vs Hadoop vs Hive vs HBase:

  • HDFS (Hadoop Distributed File System):
    • 하둡에서 사용하는 분산 데이터 파일 시스템임
    • 데이터를 여러 노드에 분산해서 저장하는 고가용성을 지원함.
  • Hadoop:
    • 대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크임.
    • 데이터를 처리해서 HDFS 에 기록하는 역할을 한다.
  • Hive:
    • Hadoop 위의 추상화된 도구임. SQL 과 같은 언어를 이용해서 대규모 데이터를 쿼리하고 분석할 수 있게 해줌.
  • HBase:
    • HDFS 위에서 구축된 데이터베이스로 대규모 데이터를 빠르게 엑세스 할 수 있는 NoSQL 임
    • Column 지향 저장소이다.
    • 대규모 데이터를 실시간 읽기, 쓰기 기능을 지원해준다.

 

 

MapReduce 보다 높은 처리량을 달성할 수 있는 프로그래밍 모델:

  • Spark:
    • MapReduce 연산을 메모리 레벨에서 달성하는 것으로 처리량을 높임.
    • 작업을 DAG 로 표현하고, Lazy Evaluation 을 통해서 불필요한 연산을 지우고 최적화된 실행 계획을 달성함.
  • GPU 기반:
    • 수천개의 코어와 고속 메모리를 가지고 있는 GPU 연산은 훨씬 강력할 수 있다.
    • 특정한 연산은 GPU 를 사용해서 처리하는게 더 높은 성능을 달성할 수 있다
  • MPI (Message Passing Interface):
    • 분산 메모리 환경에서 병렬 컴퓨팅을 수행하기 위한 표준화된 통신 프로토콜을 사용한다.
    • MPI 는 각 프로세스가 자체 메모리 공간을 가지는 분산 메모리 환경에서 동작하고 여러 프로세스를 병렬로 실행하는 모델이다. 이렇게 말하면 Map Reduce 와 차이가 없다라고 얘기할 수 있는데 더 효율적인 통신 패턴을 가진다.
    • 여러가지 통신 패턴을 지원하면서 통신 오버헤드를 줄이는게 이 처리 방식의 강점이다.
    • 통신 방식은 점대점 통신으로 두 프로세스간의 직접적인 통신을 지원하기도 하고, 집합 통신으로 여러 프로세스간의 데이터 교환을 효율적으로 지원한다.
    • 이런 직접적인 통신 방식은 기본적인 통신 방식보다 더 오버헤드가 작다고 함. 반면에 MapReduce 는 데이터 전송을 위해 중간 결과를 디스크에 쓰고 다시 읽는 과정을 거치므로 좀 더 성능적으로 떨어짐. 그리고 MapReduce 는 Map-Reduce-combine 이라는 통신 패턴만 지원한다.

 

 

배치 처리 효율적인 처리 방식 팁:

  • Putting the computation near the data 원칙. 데이터가 있는 곳 근처에서 연산을 수행하라는 원칙. 네트워크틀 통해서 데이터를 보내면 네트워크 리소스 사용량이 많이 드니까.
  • 맵리듀스의 폭발적인 처리 능력의 팁은 파티셔닝임.

 

 

Unix Shell 의 대규모 데이터 처리 방식: 

  • Unix 의 sort 는 데이터 셋이 메모리보다 클 때 데이터 셋을 쪼개서 디스크에 저장하고, 이들을 효율적으로 정렬하는 외부 정렬 알고리즘으로 사용할 ㅅ수 있는 Merge Sort 를 이용한다.
  • 정렬 방식은 다음 4가지 단계로 이뤄진다:
    • 1) Chunking: 데이터 파일을 읽어 가능한 메모리 크기만큼의 작은 청크로 나눈다. Chunk 는 메모리 크기에 맞춰진다.
    • 2) In-Memory Sorting: 각 청크를 메모리에 올려서 정렬한다. 이 단계에서는 빠른 정렬 알고리즘(예: Quick Sort, Heap Sort, 또는 Merge Sort)을 사용한다.
    • 3) Temporary Storage: 정렬된 청크를 디스크의 임시 파일에 저장한다.
    • 4) Merging: 마지막 단계에서는 모든 정렬된 청크를 병합하여 최종 정렬된 파일을 만든다.
  • Merging 과정이 효율적임:
    • 정렬된 청크를 병합하여 하나의 정렬된 청크를 만드는 과정은 일반적으로 두 개의 정렬된 청크를 포인터로 읽으면서 합치는 방식으로 이뤄진다.
    • 이는 효율적인 O(n) 의 시간 복잡도를 가짐.
    • 이 과정은 디스크를 읽으면서 이뤄지는데, 한번의 디스크 I/O 에서 두 개의 청크를 읽어와서 병합된 결과를 디스크에 쓰는 건 Linear Disk I/O 를 가능하게 한다.

 

 

Disk 데이터를 읽어오는 부분을 더 최적화 하는 방법:

  • Disk 데이터를 이진 형식으로 저장하는 것.
  • Disk 데이터를 압축해두는 것.
  • 여러 스레드나 프로세스에서 시스템 콜을 병렬로 호출해서 디스크 데이터를 병렬로 읽어오는 것
  • 큰 파일의 전체 내용을 한번에 메모리에 올리지 않고 필요한 부분만을 올리는 방법인 메모리 매핑(Memory Mapping) 을 이용하는 것도 방법이다.

 

 

유닉스 철학:

  • Make each program do one thing well:
    • 하나의 테스크를 잘하는 프로그램을 만들어라.
    • 그리고 새로운 기능이 필요하면 기존 코드를 수정해서 복잡하게 만드는 것보다 새로운 프로그램을 만드는 것이 낫다.
  • Expect the output of every program to become the input to another:
    • 하나의 프로그램이 만든 Output 이 다른 프로그램의 Input 이 될 수 있다는 생각을 하라는 것.
    • 이렇게 해야지 서로 결합하기가 쉽다. 예시로 복잡한 이진 파일 포맷 구조를 Input 으로 받도록 되도록 하지 말라는거임.
  • Design and build software to be tried early:
    • 소프트웨어는 빠르게 프로토타이핑 하고 테스트할 수 있어야한다는 것.
    • 잘 작동하지 않다고 생각하면 이 방법은 버리고 다른 방법으로 새로 작성하는 것이 가능해야한다는 것.
  • Use tools in preference to unskilled help to lighten a programming task:
    • 프로그래밍 문제를 풀기 위해서 구체적인 툴을 만드는 것이 필요할 때가 있다는 것. 설령 그것이 한번만 사용된다 하더라도.

 

 

잘 만들어진 Unix Tool 의 특징:

  • 유닉스 철학의 2번째 원칙이 적용됨. Unix 에서 uniform interface 는 File 로 표현된다. (정확하게는 File Descriptor 지만) 이런 File 은 바이너리 형식으로 표현되기 보다는 ASCII Text 로 표현되기 때문에 서로 결합되기 쉽다.
  • Input 데이터는 Immutable 하다. 그래서 실패하더라도 언제든지 rerun 시킬 수 있다. (Side-effect 를 만들어내지 않는다)
  • 파이프라인으로 처리할 때 중간 결과물을 저장할 수 있고 처리 중간에 실패했을 때 중간 결과물부터 재개할 수 있다.
  • “less” 명령을 통해서 처리를 중단하고 출력해볼 수 있다.

 

 

하지만 이런 Unix 머신 조차도 배치 프로세싱 도구로 잘 만들어졌지만, 단일 머신이라는 단점이 있기 때문에 다중 머신의 컴퓨팅 파워를 가진 MapReduce 를 이용한 Hadoop 이 등장한 것.

 

1. MapReduce and Distributed Filesystem

대표적인 분산 시스템인 HDFS (Hadoop Distributed File System) 을 위주로 설명:

  • Unix Tool 과 같이 Input 을 변경 시키지 않음. Side Effect 를 유발하지 않는다.
  • Input 으로는 File 을 받고, Output 으로도 File 을 낸다.
  • Shared Nothing Principle 을 적용했다:
    • 대표적인 반대 개념이 NAS (Network Attached Storage) 와 SAN (Storage Area Network) 가 있음. 이것은 Shared-disk storage 서비스로 특별한 네트워크 채널을 통해서 공유하는 개념이다.
    • Shared Nothing Principle 은 특수한 하드웨어, 특수한 네트워크가 필요없다라는 거임. 보편적으로 사용할 수 있다는 것.
  • HDFS 는 각 노드마다 데몬 프로세스가 동작함. 이건 netowrk service 로 다른 노드들이 해당 노드가 가진 데이터에 접근할 수 있도록 하는 거임.
  • 중앙에는 NameNode 라는 개념이 있다. File block 이떤 노드에 있는지 추적할 수 있는 역할을 해서, 여러 머신에 있는 디스크를 합쳐서 거대한 file system 을 만들 수 있도록 한다.
  • 파일 데이터에 고가용성을 부여하기 위해서 파일 데이터는 여러 머신에 복제되기도 한다.
    • 복제 방식은 단순히 같은 데이터를 여러개 복사본으로 저장하는 Simple Replication 일 수도 있고, Reed-Solomon 코드와 같은 Erasure Coding 을 이용해서 데이터를 일부 코드화 하여 저장하고, 일부 데이터가 없어지더라도 이를 복구할 수 있는 매커니즘을 이용할 수도 있다. 이 방법은 더 적은 공간을 사용하지만 복구 시간이 더 길다는 점이 있음.

 

 

MapReduce Job 생성:

  • 배치 프로그램에서 일반적으로 다음과 같이 4가지 과정이 있을 수 있음:
      1. Input Files 을 읽어오고 Records 로 분해하는 과정.
      1. Mapper function 을 통해서 record 에서 key, value 로 뽑아서 transform 해서 output 으로 (key, value) Pair 를 맏느는 것.
      1. 모든 key, value 를 Sort 와 같은 작업을 하는 것.
      1. Reducer 를 통해서 key-value Pair 를 순회하면서 같은 key 에 대한 작업을 수행하는 것. (해당 key 가 몇번 등장했는지 같이.)
    • MapReduce 는 여기서 2번과 4번 과정을 코드로 작성하는 걸 말한다. 1번과 3번 과정은 Parser 와 MapReduce 내재된 기능을 이용하면 되니까.
  • MapReduce 의 Job 은 배치 프로그램에서 Mapper 와 Reducer 를 만드는 것으로 생성할 수 있다.
    • Mapper: Input Record 를 취해서, Transform 해서 최종적으로 (Key, Value) Pairs 를 만드는 것.
    • Reduceer: 같은 키에 해당하는 데이터를 모우고 처리해서 Output Records 를 만들어내는 것.

 

 

HDFS 에서의 MapReduce 분산 실행 과정:

  • Unix Command 처리 방식과 비교해보았을 때 MapReduce 처리 방식:
    • 여러 머신으로의 병렬 처리를 내부적으로 수행해준다.
    • Mapper 처리에서 Reducer 로의 데이터 흐름에 대해서 신경쓰지 않아도 된다. 알아서 잘 해줌.
    • 프로그래밍 언어로 작성하면 된다.
  • MapReducer Scheduler 는 putting the compuation near the data 원칙을 따라서 스케줄링 해준다:
    • HDFS 에서 데이터는 여러 머신으로 분산되어 있음.
    • 그리고 각 머신마다 CPU 나 메모리와 같은 리소스는 다를 거임. 그래서 실행되는 mapper 코드와, File block 의 데이터 사이즈를 고려했을 때 해당 데이터를 가진 적절한 머신을 선택해서 더 효율적인 처리를 가능하게 해준다.
  • 실행되어야 할 작성된 Map 코드는 MapReduce Framework 에 의해서 제출되면 해당 프레임워크는 적절한 머신으로 코드를 복사해서 전달해줌. 그러면 이제 MapReduce Job 은 시작할 수 있다.
  • 실행되는 Mapper 의 수는 사용되는 input File block 의 수에 의존한다.
  • Reduce 역할을 하는 Task 의 수는 Job 을 실행하는 관리자에 의해 설정될 수 있음.
  • Mapper 의 결과로 나온 Output 은 (key, value) 쌍으로 되어있는데 Reducer 를 고르는 매커니즘은 Hash 함수이다.
  • Mapper 의 결과로 나온 Output 은 Reducer 로 전달되기 전에 먼저 정렬이 됨.
    • 정렬을 하기 전에 Mapper 는 Reducer 별로 보낼 데이터를 Partitioning 을 한다. 여기서 Reducer 에게 보낼 해시 함수가 사용됨.
    • 정렬된 대규모 데이터는 Mapper 의 로컬 디스크에 정렬된다.
    • 정렬된 이후에는 정렬되었다고 알려주면 MapReduce Scheduler 는 Reducer 에게 받을 준비하라고 알려줌. 그러면 Reducer 는 Mapper 와 연결되고, 데이터를 다운로드한다.
      • 이렇게 Reducer 가 Mapper 와 연결해서 데이터를 다운로드 하는 과정을 Shuffle 이라고 함.
    • 이렇게 정렬을 하는 이유는 동일한 키들을 모아서 Reducer 로 배치로 효율적으로 보낼 수 있음. 그리고 Reducer 입장에서는 정렬된 데이터가 오니까 효율적으로 병합할 수 있다.
  • Reducer 는 이렇게 Mapper 로 부터 취한 데이터를 프로그래머가 작성한 코드를 바탕으로 실행해서 처리하고, output records 를 생성한 후, 파일에 저장한다. 이렇게 생성된 파일은 여러 머신으로 복제가 됨.

 

 

Map Reduce 에서의 Workflow 관리:

  • Hadoop HDFS 에서는 Map Reduce 하나의 Job 을 실행할 수 있다. 그래서 Reducer 과정이 끝난 후 다시 이 데이터를 Input 으로 취급한 후 Output 을 생성하는 작업을 Chaining 식으로 엮을 수 없음.
    • Hadoop 자체에서는 이게 어렵지만, Hive, Pig, Cascading, Crunch 와 같은 고수준의 Hadoop 툴을 이용하면 가능하다.
    • 이렇게 Job 과 Job 이 서로 연결되게 실행하는 걸 workflow 라고 한다.
    • Hadoop 에서는 이게 불가능하니까 독립적인 두 개의 Job 으로 해결한다. 하나의 Job 이 결과를 만들어내서 output directory 에 기록해놨다면, 다음 Job 이 해당 디렉토리에서 데이터를 읽어서 다시 MapReduce 작업을 하는 것으로 함. 이런 방식은 장단점이 있다.
  • 이런 Workflow 의 필요성 (Job 과 Job 사이의 의존성이 있는 경우) 때문에 여러가지 workflow scheduler 가 생겨나기도 함. 대표적인게 Apache Airflow.

 

 

Batch Processing 에서의 Join:

  • 관계형 데이터베이스에서는 Join 을 할 때 더 빠르게 연산하기 위해서 Index 를 사용한다. 하지만 MapReduce 연산에서의 Join 은 일반적인 데이터베이스와는 다르게 Index 를 사용할 수 없다.
  • 그렇기 때문에 전체 데이터를 다 살펴보는 Full Scan 을 해야함.
  • 소량의 데이터에서는 이런 비용은 굉장히 비싼 연산이지만 배치 연산에서는 그래도 전체 데이터를 집계해서 처리해야하는 연산이 대부분이라서 그나마 괜찮다고 함. 뭐 특정 사용자만을 위한 연산 보다는 데이터 분석과 같은 연산이 많은 편이니까.

 

 

데이터 분석과 같은 배치 작업은 어떻게 작업을 시작할 수 있을까?

  • 다음 예시처럼 User Activity events (활동 중인 사용자) 와 User Database 에 있는 Profile 정보를 조인해야한다고 생각해보자. 조인이 필요한 이유는 이 예시 기준으로 어플리케이션에서 사용자의 나이에 맞춰서 페이지를 보여주기 위해서임. 이 사례에서는 Denoramalization 을 하기에는 적합한 경우가 아니라서 조인을 해야한다.
  • 그러면 모든 사용자마다 Database 에 건건히 질의하는 식으로 조인을 해야할까? 네트워크 통신을 매 레코드마다 한다는 건 상당히 시간이 오래 걸리고, 성능적으로도 안좋고, 데이터베이스에도 많은 부하를 줄 수 있어서 좋지 않다.
  • 이렇게 많은 수의 데이터를 처리하기 위해서는 가능한 "하나의 마신에서 네트워크 연산이 없도록 만드는게" 중요하다. 그래서 데이터를 ETL 프로세스를 통해서 파일로 복사해오는게 필요함.
  • 이렇게 사용자의 활동 로그 정보와, 사용자의 프로필 정보를 파일로 각각 복사해와서 MapReduce 연산을 하는거지.

 

 

위 에시 기준으로 Sort-merge 매커니즘:

  • 두 개의 Mapper 를 쓴다. 하나는 사용자의 활동 데이터를 기준으로 각 사용자의 Id 가 Key 가 되고, 로그 데이터가 Value 가 되는 거, 두 번째 Mapper 는 사용자의 ID 가 키가 되고, 사용자의 프로필 데이터가 Value 가 되는 것.
  • 이렇게 두 가지의 데이터를 키를 기준으로 Reducer 에서 합쳐서 처리하는 매커니즘을 Sort-Join 이라고 한다.
  • 이때 Reducer 로 보내는 데이터는 key 를 기준으로 정렬이되서 보내지지만, 키 이외의 데이터인 A Mapper 의 데이터가 먼저 보도록 하거나, B Mapper 의 레코드의 경우에는 timestamp 를 기준으로 정렬을 하는 등 내부 정렬을 한번 더 수행하는 걸 Secondary Sort 라고 부른다.
  • 이처럼 처리할 관련 데이터들을 Reducer 에 순서대로 잘 모아주도록 만드는게 High Throughput 과 Low memory overhead 를 달성할 수 있다.
  • 물론 여기서도 네트워크 통신은 있지만 배치 형식으로 통신을 하기도 하고 어디에서 데이터를 모을지 같은 건 우리가 걱정하지 않아도 됨.

 

 

MapReduce 에서 Group BY 연산은 Sort merge join 과 유사한 점이 많음:

  • 이것도 Mapper 에서 만든 (key, value) Pair 를 Reducer 에서 모우는 거니까.
  • Group BY 연산은 대표적으로 이럴 떄 쓰임:
    • Counting
    • Sum
    • Ranking
    • Sessionization (특정 사용자의 활동 내역을 모두 모아서 비교, 대조하는 것)

 

 

MapReduce 에서 Hot Key 를 다루는 방법:

  • 확실히 Hot Key 가 있다면 하나의 Reducer 의 처리 작업이 지연되어서 전체 workflow 가 지연되는 문제가 생길거임. 여기서는 Hadoop 의 High Level 도구인 Hive, Pig, Crunch 에서는 Hot Key 문제를 어떻게 다루는지 보자.
  • Skewed Join (Pig 의 방법):
    • Key 를 Sampling 해서 Hot Key 를 탐지한다. 그리고 랜덤적으로 여러개의 Reducer 를 골라서 Hot Key 의 데이터를 파티셔닝한다. 해당 키와 조인을 하는 다른 데이터들은 Hot Key 를 처리하는 모든 Reducer 에게 복제되서 전송된다.
    • 이렇게 각 Reducer 에게 처리된 이후에는 다시 하나로 합치는 두 번째 MapReduce 작업을 통해 처리됨.
  • Shared Join (Crunch 방법):
    • Skewed Join 과 유사하다. 그러나 Sampling 과정이 없고, 개발자가 명시적으로 Hot Key 를 지정해줘야한다. 이 점이 다름.
  • Map Side Join (Hive 방법):
    • SKewed Join 을 좀 더 최적화 한 방법임. 다음 글에서 좀 더 자세하게 다루고, 여기서는 Hot Key 에 대한 정보를 별도의 테이블 메타데이터 파일로 가지고 있고 Map side Join 을 통해서 해한다는 점만 알면 된다.
  • 공톶적인 건 여러개의 Reducer 로 Hot Key 를 파티셔닝 한다는 점. 그리고 여러개의 Reducer 의 작업을 다시 하나로 합쳐야 한다는 점은 알고있자.

 

 

이전까지 살펴본 방법은 Reducer-side Join 임:

  • Mapper 가 Join 을 위한 데이터를 모두 준비하는 과정을 말한다. 그래서 Reducer 는 어떠한 노력을 하지 않아도 됨.
  • 여기서 데이터를 정렬해서 파일에 쓰고, 데이터를 배치로 Reducer 로 보내고, Reducer 에서는 여러 Mapper 가 보낸 데이터를 merging 하고 이런 과정에서 여러번 I/O 기 일어나는게 Reducer side join 의 단점이라고 함.

 

 

Reducer side Join 에서의 여러번의 I/O 단점을 극복한 게 Map-side Join 이다.

  • 훨씬 빠른 조인 기법으로 Reducer 와 Sorting 이 필요없는 기법임.
  • Mapper 가 파일에서 데이터를 읽고나서 output 으로 쓰기만 하면 되는 과정임.

 

 

Map-side Join 1) Broadcast Hash Joins 소개:

  • 두 개의 데이터를 조인한다고 했을 때 상대적으로 하나의 데이터가 소량이라면, 메모리에 올리기 적합하다면 이 데이터들을 모든 Mapper 가 해시 테이블로 가지고 있고, 큰 대규모 데이터만 Mapper 에게 파티셔닝 하도록 하는 방법이 Broadcast Hash join 이다.
  • 소량의 데이터들은 모든 mapper 에 복사하다는 뜻이 broadcast 한다는 의미와 같음.
  • 이 방법을 Pig 에서는 Replicated join 이라고 하고 Hive 에서는 Map Join 이라고 함.
  • 꼭 메모리에 맞는 Hash Table 을 사용할 필요 없다. Hash Table 인덱스를 Segment 식으로 나누는 것도 가능함. (이 경우에는 각 Segment 가 담당할 범위를 알아야하므로, key 값이 단조적으로 증가한다는 가정하게서 나누는 방법이고, Key-Value 의 인덱스만 있는게 아니라 Key Range - Segment File 의 별도 인덱스도 구축해야함. 이 경우에는 잘 쓰이는 데이터만 Page Cache 에서 쓰이곘다라는 기대를 가지고 사용하는 인덱스임.)

 

 

Map-side Join 2) Partitioned hash join 소개:

  • Join 을 하려는 두 데이터가 partition 되어 있을 때 적용할 수 있는 방법이다. 각 mapper 마다 처리해야할 데이터가 분할되어 있기 때문에 대규모 데이터 처리에서 강점을 가지는 방법임.
  • 작동 방식은 예시로 보면 쉽다:
    • 1) 데이터 분할:
      • 두 개의 데이터 셋 (사용자 프로필 데이터와 사용자 이벤트 로그 데이터가) 가 파티셔닝 되어 있다고 가정해보자.
      • 이 파티션된 데이터는 독립적인 Mapper 에 의해서 처리될 수 있다. 예를 들면 사용자 Id 값에서 마지막 숫자를 기준으로 처리한다라고 가정한다면 123 이라는 사용자 Id 값은 3번째 파티션에서 처리될 거임.
    • 2) Mapper 작업:
      • 예시를 기준으로 Mapper 는 자신이 처리해야하는 데이터에서 소량의 데이터에 해당하는 부분만 해시 테이블로 올리면 된다.
      • Broadcast Hash Join 은 모든 데이터를 Hash Table 에 올렸다면, 여기서는 자신이 처리해야 할 범위의 데이터만 해시 테이블로 올리면 됨.
      • 이렇게 해시 테이블로 올린 후 입력 데이터를 받아서 조인해서 처리해나가면 된다.
  • 이 방식을 사용하기 위해서는 조인을 위한 두 데이터가 동일하게 파티셔닝이 되어 있다는 전제가 있어야한다. 예를 들면 이전 MapReduce 작업에서 Grouping 을 했다던지.
  • Hive 에서는 이 조인 방법을 buckedted map joins 라고도 함.

 

 

Map-side Join 3) Map-side merge join 소개:

  • 이 기법은 Partitioned hash join 에서의 전제인 조인하는 두 데이터가 동일하게 파티셔닝 되어 있다는 전제에 추가로 해당 데이터가 정렬까지 되어 있다는 전제하에 실행할 수 있는 기법이다.
  • 이런 조건이 붙는다면 이제 소량의 데이터를 해시 테이블로 만들 필요가 없다. 정렬되어 있기 때문에 그 부분의 데이터들만 가져오면 메모리 수준에 맞을 것이기 때문임.
  • 이 조인은 MapReduce 의 Reducer 단계에서 조인을 하는 것과 유사한데, Mapper 측면에서도 조건만 붙는다면 충분히 가능하다.

 

 

Reducer-side join 과 Map-side join 의 선택:

  • Reducer-side join 은 일반적으로 더 유연하다. 어떠한 데이터 조건을 따지지 않고 실행할 수 있으니까, 반면 map-side join 은 데이터 크기, 파티셔닝 유무, 정렬 유무를 전제로 실행할 수 있는 조인 전략이기 때문에 성능적 유리함을 취할 수 있지만 까다로운 조건을 가지고 있다.
  • 어떤 조인을 선택할 지 유무는 downstream job 에 따라서 또 결정될 수 있다는 점을 고려해야한다. Reducer-side join 은 join key 를 기준으로 정렬되고 파티셔닝 되어 있는 반면에, map-side join 은 input 으로 취한 많은 양의 데이터를 기준으로 정렬되고 파티셔닝 되어 있을 것을 고려해야한다.
  • 추가로 데이터가 어떻게 저장되어 있는지, 정렬되어 있는지, 파티셔닝 되어 있는지, 데이터의 양은 어느정도인지에 대한 고찰이 필요하다. 이게 처리 방식에 영향을 주니까. 단순히 어디에 데이터가 저장되어 있는지, 인코딩 포맷은 무엇인지 아는 정도로는 부족하다.
  • Hadoop 에코 시스템에서는 Hive Metastor 이나 HCatalog 에 데이터의 파티셔닝 유무와 데이터의 사이즈 정보, 그리고 SORTED BY 구문을 이용해서 테이블이 생성되었다면 정렬 정보도 포함되어서 알 수 있다.

 

 

MapReduce 작업의 대표적인 예시 1: 검색 엔진을 위한 Inverted Index

  • Google 에서는 특정 키워드로 검색했을 때 탐색 해봐야하는 문서를 반환하는 검색 엔진을 만들 때 MapReduce Job workflow 를 이용했다고 함
  • MapReduce Job 으로 만들어 낸 결과물은 Inverted Index 로, 키워드와 문서의 쌍을 만드는 거임. 그래서 어떤 키워드를 검색했을 때 여러 문서들이 반환되도록 만드는 작업을 했었음. (물론 이건 아주 경량화된 모델이긴 함. 실제 이것 이외에도 검색 엔진에는 관련성, 랭킹 매기기, 유의어 검색, 스펠링이 틀린 것들 검색 가능 이런 부가적인 기능도 필요하니까)
  • 만약에 Document 가 변경된다면 배치를 다시 실행시켜서 다시 Inverted Index 를 만들고 한번에 교체하는 식으로 작업을 할거임. 이런 과정이 비효율적이라고 한다면 증분 처리에 대해서 배워야함. 변경된 내용만 업데이트 하는 방식으로 스트림 처리에서 좀 더 자세하게 살펴보자.

 

 

MapReduce 작업의 대표적인 예시 2: OLTP 를 위한 데이터 생성 후 적재

  • OLTP 를 위한 데이터 생성 후 적재의 예시는 머신러닝 시스템이다. 추천 시스템의 경우 사용자 id 를 제공해주면 그 사람이 관심가져야 할 아이템들이 출력되도록 데이터베이스에 저장해두고 있을 것. 이것을 MapReduce 작업에서 만들어서 OLTP 로 사용하고 있는 데이터베이스에 적재 하는거임.
  • MapReduce 작업에서 결과를 데이터베이스에 적재하는 과정은 어떻게 될까?
    • 배치 결과로로 생성된 데이터들을 OLTP 데이터베이스로 JDBC Client 같은 것들을 이용해서 건건히 넣는 건 굉장히 안좋은 전략임.
      • Network 통신을 레코드마다 해야한다는 점
      • Batch Processing 의 원칙인 All or Nothing 을 지킬 수 없다는 점.
      • 완료되지 않은 중간 결과물을 볼 수 있다는 점.
    • 그래서 추천하는 전략은 MapReduce 로 output 을 File 로 생성했다면 이것들을 가지고 Bulk Loading 을 하라는 거임. 이렇게 하면 중간에 배치 작업이 실패했다면 File 은 생성되지 않을 거임. 그리고 OLTP 로 사용할 데이터베이스에서 지원하는 Bulk Loading 을 이용하면 훨씬 성능 좋게 적재할 수도 있다.
    • 사용할 수 있든 대표적인 데이터베이스로는 HBase, Voldemort, Terrapin, ElephantDB 등이 있음.
    • 이렇게 Bulk Loading 한 이후에 새로운 데이터로 Switching 하는 형식을 사용하면 된다고 함. 이렇게 하면 뭔가가 잘못되었을 때도 Old Data 로 이용하면 되니까.

 

 

Batch Processing 철학:

  • Input 데이터는 어떠한 상황에서라도 불변성을 줘서 여러번 실행시키고 실험해볼 수 있도록 하고, 배치 과정 중 상태를 변경하는 등의 사이드 이펙트를 일으키지 않는 것.
  • 잘못된 배치 결과로 생긴 건 그냥 버릴 수 있도록 하는 것. (all or nothing 원칙)
  • 이전 배치 결과는 새로운 배치 결과들로 스위칭될 수 있도록 하는 것
  • 성능이 좋아야 한다는 것
  • 이러한 위의 특징들 때문에 배치 작업에서 뭔가 코드가 잘못되거나 버그가 있다고 한다면 그냥 다시 배치 프로그램을 rerun 시키면 된다. 아주 간단함. OLTP 로 사용하는 데이터베이스에 우리가 버그 코드로 잘못된 데이터를 넣었다면 이는 직접 수정을 해야겠지만 배치 프로그램에서는 이럴 필요가 없음. 그냥 rerun 시키면 된다.
  • 만약 배치 프로그램이 일시적인 문제로 잘못되었을 경우를 대비해서 Retry 를 일정 횟수 넣는 것도 충분히 좋음.

 

 

Data accessibility vs Efficient Data Modeling:

  • 효율적인 구조로 데이터를 모델링 해야만 데이터를 적재할 수 있는 것보다는 어떠한 형식이던지 데이터에 빠르게 접근할 수 있다는 점이 더 중요하다고 함.
  • MapReduce 전략은 이전에 없던 전략은 아님. 수십년 전에도 massively parallel processing (MPP) 라는 기술로 존재헀었음. 그럼에도 불구하고 MPP 가 조명받지 못한 이유는 이 데이터 스토어에 적재하기 위해서는 모델링을 해야한다는 점 때문이었다. 이게 불편함을 주는 요소였었음.
  • 이런 데이터 접근성이 중요하기 때문에 데이터 웨어하우스라는 개념이 나온 것 같기도 하다.
  • 일단 데이터를 사용 가능하게 접근할 수 있게 만들어두는 것. 그러면 이런 데이터를 조인 같은 연산을 통해서 합치는 작업으로 인해서 부가가치가 생겨난다.

 

 

Raw 한 데이터가 더 나을 수도 있다. 데이터를 해석할 수 있는 팀이 여러개라면, 데이터에 대한 관점이 다양할 것이니.

 

 

MPP 와 MapReduce 를 비교하는 것으로 배치 프로세싱에서 중요한 것이 무엇인지에 대해 알 수 있음:

  • MPP 는 실패하는 순간에 사용자에게 알려주고 사용자가 Task 를 재시도 하도록 만든다. 결과적으로 전체 Job 이 재시작 됨. 그에 반해서 MapReduce 는 실패한 부분적인 작업에 대해서 알아서 재시도를 통해 해결하려고 함. 어떤 하나의 테스크 때문에 전체 Job 이 다시 시작해야한다는 점은 자원 낭비일 수 있다.

 

2. Beyond MapReduce

MapReduce 작업을 Chaining 식으로 사용할 때 비효율성:

  • MapReduce 작업은 이전 단계의 작업이 모두 완료된 후에만 다음 단계가 시작될 수 있으므로 전체 작업 흐름의 속도를 느리게 할 수 있다.
  • 서로 다른 기계들에 작업 부하가 균등하지 않으면, 몇몇 태스크가 늦게 끝나게 되어 전체 작업의 완료 시간이 지연될 수 있다.
  • 맵퍼는 종종 리듀서가 생성한 데이터를 다시 읽어서 다음 단계로 준비하는 역할을 하는데 이는 중복으로 비효율적일 수 있다.

 

 

MapReduce 의 비효율성을 해결하기 위한 Dataflow:

  • 작업을 독립된 하위 작업으로 나눠서 각각 실행하는 대신 전체 워크플로를 하나의 작업으로 처리함.
  • 데이터플로우 엔진은 중간 상태를 파일 시스템에 저장하지 않음.
  • Dataflow 엔진은 데이터가 여러 처리 단계를 거치는 흐름을 명시적으로 모델링할 수 있음. 엄격하게 Map 과 Reduce 연산을 따르지 않아도 됨. 이를 Operator 라고 부른다. 한 연산자의 출력을 다른 연산자의 입력으로 연결하는 식으로 사용됨.
  • 장점:
    • 정렬과 같은 비싼 작업은 실제로 필요한 경우에만 수행됨. 매번 Map Reduce 마다 할 필요가 없음.
    • 중간 상태는 메모리나 로컬 디스크에 저장하면 충분하므로, HDFS에 쓰는 것보다 I/O가 적게 발생함.
    • 연산자는 입력이 준비되면 즉시 실행을 시작할 수 있으며 이전 단계가 모두 완료될 떄까지 기다리지 않아도 됨.

 

 

Fault Tolerance (MapReduce 와 Dataflow Engine):

  • MapReduce의 장애 허용성:
    • MapReduce 는 중간 상태를 분산 파일 시스템에 완전히 저장하는 방식으로 장애 허용성을 비교적 쉽게 가짐.
  • Dataflow Engine 의 장애 허용성:
    • Dataflow Engine 은 중간 상태를 HDFS에 쓰는 것을 피하므로, 장애 허용에 대해 다른 접근 방식을 취함.
    • 장애 허용성을 주는 방법은 저장된 중간 단계의 데이터를 이용하거나, 다시 재계산을 하는 방법을 사용한다.
    • Spark 는 RDD 를 이용해서 데이터의 조상을 추적할 수 있고, Flink 는 체크 포인트 기능을 이용해서 해당 연산자를 다시 사용하는 방법을 취한다.
    • 그리고 중요한 건 연산자들을 결정론적 연산을 이용하는 것. 무작위나, 시간과 관련된 비결정적 연산의 경우에는 장애 허용성을 취할 때 문제가 될 수 있음.
  • 장애 허용성을 주는 매커니즘은 Checkpoint vs Recoumputation 이다. 중간 데이터가 원본 데이터보다 훨씬 작거나 계산이 매우 CPU 집약적인 경우에는 중간 데이터를 파일에 저장흐는 것이 더 나을 수 있음.

 

 

Graphs and Iterative Processing:

  • 그래프 처리에서도 배치 처리를 하는 경우도 있음. 주로 추천 엔진이나 랭킹 시스템과 같은 머신러닝 응용 프로그램에서 필요로 함.
  • 이런 그래프 알고리즘의 처리는 종료 조건을 만날 때까지 간선을 따라 처리해나가는 반복적인 처리가 특징임.
  • 이런 반복적인 처리는 외부 스케줄러에 의해서 한단계씩 계산되면서 처리됨.
  • 하지만 이런 처리 방식은 MapReduce 연산과는 비효율적임. MapReduce 는 반복적인 특성을 고려하지 않고, 매번 전체 데이터를 읽고 완전히 새로운 출력 데이터를 생성하고 저장하는 방식이기 때문임. 매 스텝마다 새로운 데이터를 저장하고, 다시 읽는 작업은 장애 허용성을 가질진 몰라도 느리긴 함.

 

 

Pregel 처리 모델:

  • 분산 그래프 처리 모델임. 하나의 머신에서 그래프의 Vertex 상태를 모두 유지하기 어려운 경우에 사용하는 프레임워크.
  • 그래프 배치 처리를 최적화하기 위해 BSP (bulk synchronous parallel) 을 사용하며, 각 정점 (Vertex) 이 메시지를 주고 받으며 상태를 유지하는 방식으로 처리함. 상태를 유지하므로 새로운 메시지만 처리하면 되니까 효율을 높일 수 있고, 동기식으로 진행되므로 다음 스텝으로 넘어갈 때는 이전 메시지를 모두 보내게 됨
  • 메시지를 일괄 처리할 수 있어 통신 대기 시간이 줄어듬.
  • Pregel 구현은 다음 반복에서 대상 정점에서 메시지가 정확히 한 번 처리된다는 것을 보장함.

 

 

Pregel 모델의 장애 허용성(fault tolerance):

  • Pregel의 장애 허용성은 모든 정점의 상태를 주기적으로 체크포인트하여 내구성 있는 저장소에 저장함으로써 달성된다. 반복이 끝날 때 모든 정점의 상태를 주기적으로 체크포인트함.
  • 노드가 실패하여 메모리 상태가 손실되면, 가장 간단한 해결책은 마지막 체크포인트로 그래프 계산을 롤백하고 그 지점에서 계산을 다시 시작하는 것.

 

 

그래프 알고리즘의 병렬 실행(parallel execution):

  • Vertex Id 에 Vertex 는 각각의 머신으로 파티셔닝되서 실행됨. 이상적으로는 많이 통신해야 하는 Vertex 는 같은 머신에 배치되면 성능이 뛰어나긴 하지만 이런 최적화는 어려우므로 Id 를 가지고 파티셔닝 한다고 함.
  • 그래서 그래프 알고리즘은 많은 머신끼리 통신을 하기 때문에 네트워크 통신 오버헤드가 클 수 있고, 중간 상태의 메시지들은 점점 커지는 문제가 발생하기도 함.
  • 가능하다면 단일 머신이 좋긴 하지만 너무 그래프가 커진다면 Pregel 모델을 사용하는 건 어쩔 수 없다고 함.

 

 

MapReduce 이후에는 고수준 API와 선언형 언어가 등장하고 있다고 함:

  • 생산성이 더 좋아지고. 최적화를 통해서 성능적으로도 뛰어날 수 있음.

 

 

References:

  • 데이터 중심 어플리케이션 설계

+ Recent posts