이 글은 LLM Twin 프로젝트의 데이터 수집 파이프라인을 설계하고 구현하는 방법에 대해 설명함.
이 프로젝트는 실제 세계의 시나리오를 모방하여 데이터를 수집하고, 이를 머신러닝(ML) 애플리케이션에 활용할 수 있는 형태로 변환하는 과정을 다룸.
이 글은 데이터 엔지니어링의 기본 개념인 ETL(Extract, Transform, Load) 프로세스를 중심으로 설명하며, 특히 소셜 플랫폼(Medium,
Substack, GitHub 등)에서 데이터를 크롤링하고, 이를 MongoDB 데이터 웨어하우스에 저장하는 방법을 다룬다.
Overview:
LLM Twin 프로젝트 개요:
- 이 프로젝트는 실제 세계에서 데이터를 수집하고 처리하는 과정을 모방합니다.
- 데이터 수집 파이프라인을 설계하고 구현하여, 다양한 소셜 플랫폼에서 데이터를 크롤링하고, 이를 MongoDB에 저장합니다.
- 이 데이터는 이후 머신러닝 모델(LLM)의 학습 및 추론에 사용됩니다.
ETL 프로세스:
- Extract(추출): Medium, Substack, GitHub 등의 플랫폼에서 데이터를 크롤링합니다.
- Transform(변환): 크롤링한 데이터를 정제하고 표준화하여 일관된 형식으로 변환합니다.
- Load(적재): 변환된 데이터를 MongoDB 데이터 웨어하우스에 저장합니다.
데이터 수집 파이프라인 설계:
- 사용자와 링크 목록을 입력으로 받아, 각 링크의 도메인을 기반으로 적절한 크롤러를 호출합니다.
- 크롤러는 각 플랫폼에 맞게 데이터를 수집하고, 이를 MongoDB에 저장합니다.
- 크롤러는 Medium, GitHub, LinkedIn 등 다양한 플랫폼에서 데이터를 수집하며, 각각의 데이터는 Article, Repository, Post 등의 카테고리로 분류됩니다.
크롤러 구현:
- Medium 크롤러: Medium에서 데이터를 수집하여 Article 문서로 변환합니다.
- Custom Article 크롤러: 다양한 사이트에서 Article을 수집하는 일반적인 크롤러입니다.
- GitHub 크롤러: GitHub에서 Repository 데이터를 수집합니다.
- LinkedIn 크롤러: LinkedIn에서 Post 데이터를 수집합니다.
MongoDB 사용 이유:
- MongoDB는 NoSQL 데이터베이스로, 비정형 데이터(텍스트)를 저장하는 데 적합합니다.
- 소규모 데이터셋(수백 개의 문서)을 다루는 데 적합하며, 개발이 쉽고 빠릅니다.
- 대규모 데이터셋(수백만 개 이상의 문서)을 다룰 때는 Snowflake나 BigQuery와 같은 전용 데이터 웨어하우스를 사용하는 것이 이상적입니다.
ETL 파이프라인과 Feature 파이프라인의 관계:
- ETL 파이프라인은 MongoDB에 데이터를 저장하고, Feature 파이프라인은 이 데이터를 추가로 정제하고 처리하여 Qdrant 벡터 데이터베이스에 저장합니다.
- 두 파이프라인은 MongoDB를 통해 독립적으로 통신하며, 서로 다른 스케줄로 실행될 수 있습니다
향후 확장성:
- 현재는 소규모 데이터셋을 다루지만, 향후 더 많은 데이터 소스를 추가하여 대규모 데이터셋을 구축할 수 있습니다.
- 이 아키텍처는 새로운 데이터 소스를 쉽게 추가할 수 있도록 설계되어 있습니다.
LLM Twin 프로젝트의 데이터 수집 파이프라인 구현
이 부분에서는 ZenML을 사용하여 데이터 수집 파이프라인을 어떻게 구현하는지, 그리고 각 단계에서 어떤 작업이 이루어지는지 상세히 다룹니다.
특히 ZenML 파이프라인과 크롤러 디스패처(CrawlerDispatcher)의 구현에 초점을 맞추고 있습니다.
ZenML 파이프라인 소개:
- ZenML은 머신러닝 파이프라인을 구성하고 실행하는 데 사용되는 도구입니다.
- digital_data_etl 파이프라인은 사용자의 전체 이름과 크롤링할 링크 목록을 입력으로 받아 데이터 수집 작업을 수행합니다.
- 이 파이프라인은 두 가지 주요 단계로 구성됩니다:
- get_or_create_user: 사용자를 데이터베이스에서 조회하거나 새로 생성합니다.
- crawl_links: 제공된 링크를 크롤링하여 데이터를 수집합니다.
- get_or_create_user 단계:
- 크롤링 한 문서의 저작자를 기록하기 위함임.
- 이 단계는 사용자의 전체 이름을 입력으로 받아 MongoDB 데이터베이스에서 해당 사용자를 조회하거나, 존재하지 않으면 새로 생성합니다.
- 사용자의 전체 이름을 이름과 성으로 분리한 후, 데이터베이스에서 조회하거나 새로 생성합니다.
- ZenML의 step_context를 사용하여 출력 메타데이터를 추가합니다. 이 메타데이터는 사용자 정보와 쿼리 파라미터를 포함합니다.
- crawl_links 단계:
- 이 단계는 사용자와 링크 목록을 입력으로 받아 각 링크를 크롤링합니다.
- CrawlerDispatcher를 사용하여 링크의 도메인에 따라 적절한 크롤러를 선택합니다.
- 크롤링 작업은 각 링크에 대해 독립적으로 수행되며, 성공적으로 크롤링된 링크의 수와 메타데이터를 기록합니다.
- 크롤링 중 발생하는 예외는 적절히 처리됩니다.
크롤러 디스패처(CrawlerDispatcher):
- CrawlerDispatcher는 링크의 도메인을 기반으로 적절한 크롤러를 선택하는 역할을 합니다.
- 예를 들어, LinkedIn, Medium, GitHub 등의 도메인에 따라 각각의 크롤러를 등록하고 사용합니다.
- 크롤러의 extract() 메서드를 호출하여 링크에서 데이터를 추출하고, 이를 MongoDB에 저장합니다.
메타데이터 관리:
- 크롤링 작업의 결과는 메타데이터로 기록됩니다. 이 메타데이터는 각 도메인별로 성공적으로 크롤링된 링크의 수와 총 시도 횟수를 포함합니다.
- 이 메타데이터는 ZenML의 step_context를 통해 출력 아티팩트에 추가됩니다
크롤링 로직:
- 각 링크에 대해 _crawl_link() 함수가 호출되어 크롤링을 시도합니다.
- 크롤링 중 예외가 발생하면 로그에 기록되고, 크롤링 실패로 간주됩니다.
- 크롤링이 성공하면 메타데이터가 업데이트되고, 성공 횟수가 증가합니다.
LLM Twin 프로젝트의 데이터 수집 파이프라인 구현 - 코드 구형 상세 설명
get_or_create_user 단계:
@step
def get_or_create_user(user_full_name: str) -> Annotated[UserDocument, "user"]:
logger.info(f"Getting or creating user: {user_full_name}")
first_name, last_name = utils.split_user_full_name(user_full_name)
user = UserDocument.get_or_create(first_name=first_name, last_name=last_name)
step_context = get_step_context()
step_context.add_output_metadata(output_name="user", metadata=_get_metadata(user_full_name, user))
return user
- 사용자의 전체 이름을 이름과 성으로 분리합니다.
- 데이터베이스에서 사용자를 조회하거나 새로 생성합니다.
- ZenML의 step_context를 사용하여 메타데이터를 추가합니다.
crawl_links 단계:
@step
def crawl_links(user: UserDocument, links: list[str]) -> Annotated[list[str], "crawled_links"]:
dispatcher = CrawlerDispatcher.build().register_linkedin().register_medium().register_github()
logger.info(f"Starting to crawl {len(links)} link(s).")
metadata = {}
successful_crawls = 0
for link in tqdm(links):
successful_crawl, crawled_domain = _crawl_link(dispatcher, link, user)
successful_crawls += successful_crawl
metadata = _add_to_metadata(metadata, crawled_domain, successful_crawl)
step_context = get_step_context()
step_context.add_output_metadata(output_name="crawled_links", metadata=metadata)
logger.info(f"Successfully crawled {successful_crawls} / {len(links)}")
- CrawlerDispatcher를 초기화하고 LinkedIn, Medium, GitHub 크롤러를 등록합니다.
- 각 링크에 대해 크롤링을 시도하고, 성공 여부를 기록합니다.
- 메타데이터를 업데이트하고 ZenML의 step_context를 통해 출력 아티팩트에 추가합니다.
크롤링 로직:
def _crawl_link(dispatcher: CrawlerDispatcher, link: str, user: UserDocument) -> tuple[bool, str]:
crawler = dispatcher.get_crawler(link)
crawler_domain = urlparse(link).netloc
try:
crawler.extract(link=link, user=user)
return (True, crawler_domain)
except Exception as e:
logger.error(f"An error occurred while crawling: {e}")
return (False, crawler_domain)
- 링크의 도메인을 기반으로 적절한 크롤러를 선택합니다.
- 크롤링 중 예외가 발생하면 로그에 기록하고 실패로 간주합니다.
메타데이터 업데이트:
def _add_to_metadata(metadata: dict, domain: str, successful_crawl: bool) -> dict:
if domain not in metadata:
metadata[domain] = {}
metadata[domain]["successful"] = metadata.get(domain, {}).get("successful", 0) + successful_crawl
metadata[domain]["total"] = metadata.get(domain, {}).get("total", 0) + 1
return metadata
CrawlerDispatcher 클래스의 구현과 역할
이 클래스는 제공된 링크의 도메인을 분석하여 적절한 크롤러를 선택하고 초기화하는 중간 계층 역할을 합니다.
이는 크롤링 로직의 핵심 부분으로, 다양한 플랫폼(Medium, LinkedIn, GitHub 등)에서 데이터를 수집하기 위해 각 플랫폼에 맞는 크롤러를 동적으로 선택합니다.
CrawlerDispatcher의 역할:
- CrawlerDispatcher는 제공된 링크의 도메인을 분석하여 적절한 크롤러를 선택합니다.
- 예를 들어, https://medium.com 링크는 MediumCrawler를, https://github.com 링크는 GithubCrawler를 사용합니다.
- 이 클래스는 크롤링 로직의 중간 계층으로, 링크와 크롤러 사이의 매핑을 관리합니다.
CrawlerDispatcher의 구조:
- 크롤러 등록: Medium, LinkedIn, GitHub 등의 플랫폼에 대한 크롤러를 등록합니다.
- 크롤러 선택: 제공된 링크의 도메인을 분석하여 등록된 크롤러 중 적절한 것을 선택합니다.
- 기본 크롤러: 등록된 크롤러가 없는 경우, 기본적으로 CustomArticleCrawler를 사용합니다.
크롤러 등록 메서드:
- register_medium(), register_linkedin(), register_github() 등의 메서드를 통해 각 플랫폼에 대한 크롤러를 등록합니다.
- 이 메서드들은 빌더 패턴(Builder Pattern) 을 따르며, 메서드 체이닝을 통해 여러 크롤러를 한 번에 등록할 수 있습니다.
크롤러 선택 로직:
- get_crawler() 메서드는 제공된 링크의 도메인을 분석하여 등록된 크롤러 중 적절한 것을 선택합니다.
- 만약 등록된 크롤러가 없는 경우, CustomArticleCrawler를 기본값으로 사용합니다.
정규 표현식을 사용한 도메인 매칭:
- 도메인을 정규 표현식으로 정규화하여, 다양한 형태의 URL(예: www 포함 여부, 하위 경로 등)을 처리할 수 있도록 합니다.
CrawlerDispatcher 클래스의 구현과 역할 - 코드 구현 상세 설명
크롤러 등록:
def register(self, domain: str, crawler: type[BaseCrawler]) -> None:
parsed_domain = urlparse(domain)
domain = parsed_domain.netloc
self._crawlers[fr"https://(www\.)?{re.escape(domain)}/*"] = crawler
- urlparse를 사용하여 도메인을 정규화합니다.
- domain(예: https://medium.com)을 구성 요소로 분해합니다.
- parsed_domain.netloc를 통해 도메인(medium.com)을 추출합니다.
- 정규 표현식을 사용하여 도메인 패턴을 생성하고, 이를 크롤러와 매핑합니다.
- 도메인(medium.com)을 정규 표현식에서 안전하게 사용할 수 있도록 이스케이프 처리합니다
- 정규표현식에서 . 은 모든 문자를 나타내므로 이를 이스케이프 처리해서 문자 . 그대로 사용함.
- 예를 들어, https://medium.com은 https://(www.)?medium.com/* 패턴으로 변환됩니다.
배경지식 - urlparse:
- urlparse는 Python의 urllib.parse 모듈에 포함된 함수로, URL을 구성 요소로 분해합니다. 이 함수는 URL을 다음과 같은 구성 요소로 나눕니다:
- scheme: 프로토콜 (예: http, https)
- netloc: 네트워크 위치 (예: www.example.com)
- path: 경로 (예: /path/to/resource)
- params: 파라미터 (거의 사용되지 않음)
- query: 쿼리 문자열 (예: ?key=value)
- fragment: 프래그먼트 (예: #section1)
- 예: (https://medium.com)
- ParseResult(scheme='https', netloc='medium.com', path='', params='', query='', fragment='')
크롤러 선택:
def get_crawler(self, url: str) -> BaseCrawler:
for pattern, crawler in self._crawlers.items():
if re.match(pattern, url):
return crawler()
else:
logger.warning(f"No crawler found for {url}. Defaulting to CustomArticleCrawler.")
return CustomArticleCrawler()
- 제공된 URL을 정규 표현식 패턴과 비교하여 적절한 크롤러를 선택합니다.
- 매칭되는 크롤러가 없는 경우, CustomArticleCrawler를 기본값으로 반환합니다.
빌더 패턴을 사용한 크롤러 등록:
def register_medium(self) -> "CrawlerDispatcher":
self.register("https://medium.com", MediumCrawler)
return self
def register_linkedin(self) -> "CrawlerDispatcher":
self.register("https://linkedin.com", LinkedInCrawler)
return self
def register_github(self) -> "CrawlerDispatcher":
self.register("https://github.com", GithubCrawler)
return self
- 빌더 패턴은 복잡한 객체를 생성하는 과정을 단순화하고, 객체 생성 과정을 더 직관적이고 유연하게 만드는 데 유용한 디자인 패턴
- 빌더 패턴을 사용하면 객체 생성 과정을 메서드 체이닝(method chaining) 방식으로 표현할 수 있습니다. 이는 코드를 읽는 사람에게 객체 생성 과정을 더 직관적으로 이해할 수 있게 해줍니다.
- 빌더 패턴은 새로운 크롤러를 추가하거나 기존 크롤러를 수정하는 작업을 쉽게 만듭니다. 새로운 크롤러를 추가하려면 단순히 새로운 register_*() 메서드를 정의하면 됩니다.
크롤러의 기본 클래스와 Selenium을 사용하는 크롤러의 기본 클래스
이 부분은 모든 크롤러가 공통적으로 따르는 인터페이스를 정의하고, Selenium을 사용하는 크롤러의 공통 기능을 구현하는 데 초점을 맞추고 있습니다.
크롤러의 기본 클래스: BaseCrawler
- 모든 크롤러는 BaseCrawler라는 추상 클래스를 상속받아 구현됩니다. 이 클래스는 크롤러의 공통 인터페이스를 정의하며, 특히 extract() 메서드를 통해 크롤링 로직을 표준화합니다.
- 주요 특징:
- extract() 메서드: 모든 크롤러는 이 메서드를 구현해야 합니다. 이 메서드는 크롤링할 링크를 입력으로 받아 데이터를 추출합니다.
- model 속성: 크롤링된 데이터를 MongoDB에 저장할 때 사용할 문서 유형을 정의합니다. 이는 각 크롤러가 다른 데이터 카테고리(예: Article, Repository, Post)를 처리할 수 있도록 합니다.
from abc import ABC, abstractmethod
class BaseCrawler(ABC):
model: type[NoSQLBaseDocument]
@abstractmethod
def extract(self, link: str, **kwargs) -> None: ...
- ABC는 추상 클래스를 정의하기 위해 사용됩니다.
- @abstractmethod 데코레이터는 하위 클래스에서 반드시 구현해야 하는 메서드를 표시합니다.
- extract 타입이 None 인 이유:
- BaseCrawler의 extract() 메서드가 None을 반환하는 이유는 크롤링된 데이터를 직접 반환하지 않고, 데이터베이스에 저장하는 역할을 하기 때문입니다. 이 설계는 크롤링된 데이터를 즉시 처리하거나 반환하는 대신, 데이터베이스에 저장하는 방식으로 동작하도록 설계되었습니다.
Selenium을 사용하는 크롤러의 기본 클래스: BaseSeleniumCrawler:
- Selenium은 웹 브라우저를 자동화하는 도구로, 동적으로 로드되는 웹 페이지(예: Medium, LinkedIn)에서 데이터를 수집할 때 사용됩니다. BaseSeleniumCrawler는 Selenium을 사용하는 크롤러의 공통 기능을 제공합니다.
- 동적으로 로드되는 웹 페이지는 사용자가 페이지를 처음 요청했을 때 모든 콘텐츠가 한번에 로드되지 않고 사용자와의 상호작용 (스크롤, 버튼,) 이후에 콘텐추가 점진적으로 로드되는 페이지를 말함.
- 동적 로딩의 예시
- 무한 스크롤 (Infinite Scroll)
- 사용자가 페이지를 스크롤하면, 새로운 콘텐츠가 자동으로 로드됩니다.
- 예: Facebook, Instagram, LinkedIn의 피드.
- 탭 전환
- 사용자가 탭을 클릭하면, 해당 탭의 콘텐츠가 동적으로 로드됩니다.
- 예: 웹사이트의 제품 카테고리 탭.
- 모달 창
- 사용자가 버튼을 클릭하면, 새로운 창(모달)이 동적으로 로드됩니다.
- 예: 로그인 창, 상세 정보 창.
- 무한 스크롤 (Infinite Scroll)
- 주요 특징:
- Chrome 옵션 설정: Selenium을 사용할 때 Chrome 브라우저의 설정을 최적화합니다. 예를 들어, GPU 렌더링 비활성화, 팝업 차단, 알림 비활성화 등을 설정합니다.
- 스크롤 기능: LinkedIn과 같은 플랫폼에서 스크롤을 통해 추가 콘텐츠를 로드하는 기능을 제공합니다.
- 로그인 기능: 각 플랫폼의 로그인 페이지는 HTML 구조가 다르기 때문에, 하위 클래스에서 login() 메서드를 오버라이드하여 구현합니다.
class BaseSeleniumCrawler(BaseCrawler, ABC):
def __init__(self, scroll_limit: int = 5) -> None:
options = webdriver.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--headless=new")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--log-level=3")
options.add_argument("--disable-popup-blocking")
options.add_argument("--disable-notifications")
options.add_argument("--disable-extensions")
options.add_argument("--disable-background-networking")
options.add_argument("--ignore-certificate-errors")
options.add_argument(f"--user-data-dir={mkdtemp()}")
options.add_argument(f"--data-path={mkdtemp()}")
options.add_argument(f"--disk-cache-dir={mkdtemp()}")
options.add_argument("--remote-debugging-port=9226")
self.set_extra_driver_options(options)
self.scroll_limit = scroll_limit
self.driver = webdriver.Chrome(options=options)
- Chrome 옵션: Selenium이 사용할 Chrome 브라우저의 설정을 정의합니다. 예를 들어, --headless=new는 브라우저를 백그라운드에서 실행하도록 합니다.
스크롤 기능: scroll_page()
- 동적으로 로드되는 웹 페이지(예: LinkedIn 피드)에서 데이터를 수집하려면 스크롤 기능이 필요합니다. scroll_page() 메서드는 페이지를 스크롤하며 새로운 콘텐츠가 로드될 때까지 기다립니다.
def scroll_page(self) -> None:
current_scroll = 0
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(5)
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height or (self.scroll_limit and current_scroll >= self.scroll_limit):
break
last_height = new_height
current_scroll += 1
- execute_script(): JavaScript 코드를 실행하여 페이지를 스크롤합니다.
- scroll_limit: 스크롤 횟수를 제한하여 무한 스크롤을 방지합니다.
셀레니움 - 하위 클래스에서 구현할 메서드:
- set_extra_driver_options(): 하위 클래스에서 추가적인 Chrome 옵션을 설정할 수 있습니다.
- login(): 각 플랫폼의 로그인 페이지는 HTML 구조가 다르기 때문에, 하위 클래스에서 이 메서드를 구현해야 합니다.
def set_extra_driver_options(self, options: Options) -> None:
pass
def login(self) -> None:
pass
구체적인 크롤러 구현
- 이제 BaseCrawler와 BaseSeleniumCrawler를 상속받아 구체적인 크롤러를 구현할 수 있습니다. 예를 들어:
- GitHubCrawler: GitHub에서 데이터를 수집합니다.
- CustomArticleCrawler: 일반적인 웹사이트에서 아티클을 수집합니다.
- MediumCrawler: Medium에서 아티클을 수집합니다.
GitHubCrawler 클래스의 구현
이 클래스는 GitHub 저장소를 크롤링하고, 저장소의 파일 내용을 MongoDB에 저장하는 역할을 합니다.
Selenium을 사용하지 않고, Git의 clone 기능을 활용하여 저장소를 로컬에 복제한 후 파일을 처리합니다.
GitHubCrawler 클래스의 목적:
- GitHub 저장소를 크롤링하여 파일 내용을 수집합니다.
- 수집된 데이터를 MongoDB에 저장합니다.
- 중복 크롤링을 방지하기 위해 이미 처리된 저장소는 건너뜁니다
클래스 구조 - 초기화(__init__)
- model: 크롤링된 데이터를 MongoDB에 저장할 때 사용할 문서 모델(RepositoryDocument)을 지정합니다
- ignore: 크롤링에서 제외할 파일 및 디렉토리 패턴을 지정합니다. 예를 들어, .git, .toml, .lock, .png 파일은 크롤링하지 않습니다.
class GithubCrawler(BaseCrawler):
model = RepositoryDocument
def __init__(self, ignore=(".git", ".toml", ".lock", ".png")) -> None:
super().__init__()
self._ignore = ignore
배경지식 - 튜플:
- 클래스 초기화 코드에서 기본값으로 튜플을 사용한 이유:
- 튜플은 불변 시퀀스 타입
- 튜플은 생성한 후 수정할 수 없음. Immutable 함.
- 튜플은 리스트보다 메모리 사용량이 적고, 접근 속도가 빠르다.
- 튜플은 주로 고정된 값의 집합을 나타내는데 사용됨.
- 튜플은 순서가 있음. 따라서 인덱스를 사용해서 접근 가능.
- 튜플은 중복된 값을 허용할 수 있음.
- 튜플은 다양한 데이터 타입을 저장할 수 있음.
클래스 구조 - extaract method():
- extract() 메서드는 GitHub 저장소를 크롤링하고, 데이터를 MongoDB에 저장하는 주요 로직을 포함합니다.
def extract(self, link: str, **kwargs) -> None:
old_model = self.model.Find(link=link)
if old_model is not None:
logger.info(f"Repository already exists in the database: {link}")
return
- 중복 크롤링 방지 코드:
logger.info(f"Starting scrapping GitHub repository: {link}")
repo_name = link.rstrip("/").split("/")[-1]
local_temp = tempfile.mkdtemp()
- 저장소 이름 추출 및 임시 디렉토리 생성:
- repo_name: GitHub 저장소 링크에서 저장소 이름을 추출합니다. 예를 들어, https://github.com/user/repo에서 repo를 추출합니다
- local_temp: 임시 디렉토리를 생성하여 저장소를 복제할 공간을 만듭니다.
try:
os.chdir(local_temp)
subprocess.run(["git", "clone", link])
- 저장소 복제:
- os.chdir(local_temp): 현재 작업 디렉토리를 임시 디렉토리로 변경합니다.
- subprocess.run(["git", "clone", link]): Git 명령어를 사용하여 저장소를 복제합니다.
repo_path = os.path.join(local_temp, os.listdir(local_temp)[0])
tree = {}
for root, _, files in os.walk(repo_path):
dir = root.replace(repo_path, "").lstrip("/")
if dir.startswith(self._ignore):
continue
for file in files:
if file.endswith(self._ignore):
continue
file_path = os.path.join(dir, file)
with open(os.path.join(root, file), "r", errors="ignore") as f:
tree[file_path] = f.read().replace(" ", "")
- repo_path: 복제된 저장소의 경로를 설정합니다.
- os.walk(repo_path): 저장소의 디렉토리 트리를 탐색합니다.
- self._ignore: 무시할 파일 및 디렉토리를 필터링합니다.
- tree[file_path]: 파일 경로를 키로, 파일 내용을 값으로 하는 딕셔너리를 생성합니다.
user = kwargs["user"]
instance = self.model(
content=tree,
name=repo_name,
link=link,
platform="github",
author_id=user.id,
author_full_name=user.full_name,
)
instance.save()
- self.model: RepositoryDocument 모델을 사용하여 크롤링된 데이터를 MongoDB에 저장합니다.
- content: 파일 내용을 저장합니다.
- name: 저장소 이름을 저장합니다.
- link: 저장소 링크를 저장합니다.
- platform: 플랫폼 정보(github)를 저장합니다.
- author_id 및 author_full_name: 저장소 작성자 정보를 저장합니다.
except Exception:
raise
finally:
shutil.rmtree(local_temp)
logger.info(f"Finished scrapping GitHub repository: {link}")
- shutil.rmtree(local_temp): 임시 디렉토리를 삭제하여 리소스를 정리합니다.
- finally: 예외가 발생하더라도 임시 디렉토리는 반드시 정리됩니다.
CustomArticleCrawler 클래스의 구현
이 클래스는 웹사이트에서 아티클(기사)을 크롤링하고, HTML을 텍스트로 변환한 후 MongoDB에 저장하는 역할을 합니다.
특히, langchain_community 패키지의 AsyncHtmlLoader와 Html2TextTransformer를 사용하여 HTML을 로드하고 텍스트로 변환합니다.
CustomArticleCrawler 클래스의 목적
- 웹사이트에서 아티클을 크롤링합니다.
- HTML을 텍스트로 변환하여 데이터를 추출합니다.
- 추출된 데이터를 MongoDB에 저장합니다.
- 중복 크롤링을 방지하기 위해 이미 처리된 아티클은 건너뜁니다.
클래스 구조 - 초기화:
- BaseCrawler 상속: CustomArticleCrawler는 BaseCrawler를 상속받아 크롤링 로직을 구현합니다.
- model: 크롤링된 데이터를 MongoDB에 저장할 때 사용할 문서 모델(ArticleDocument)을 지정합니다.
class CustomArticleCrawler(BaseCrawler):
model = ArticleDocument
클래스 구조 - extract 메소드 - 중복 크롤링 방지:
- self.model.find(link=link): MongoDB에서 해당 링크의 아티클이 이미 존재하는지 확인합니다.
- 만약 이미 존재한다면, 크롤링을 중단하고 메서드를 종료합니다.
def extract(self, link: str, **kwargs) -> None:
old_model = self.model.find(link=link)
if old_model is not None:
logger.info(f"Article already exists in the database: {link}")
return
클래스 구조 - extract 메소드 - HTML 로드 및 텍스트 변환:
- AsyncHtmlLoader: 비동기적으로 HTML을 로드합니다.
- Html2TextTransformer: HTML을 텍스트로 변환합니다.
- docs_transformed[0]: 변환된 문서 중 첫 번째 문서를 사용합니다.
logger.info(f"Starting scrapping article: {link}")
loader = AsyncHtmlLoader([link])
docs = loader.load()
html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)
doc_transformed = docs_transformed[0]
클래스 구조 - extract 메소드 - 컨텐츠 추출:
- Title: 아티클의 제목을 추출합니다.
- Subtitle: 아티클의 부제목을 추출합니다.
- Content: 아티클의 본문을 추출합니다.
- language: 아티클의 언어를 추출합니다.
content = {
"Title": doc_transformed.metadata.get("title"),
"Subtitle": doc_transformed.metadata.get("description"),
"Content": doc_transformed.page_content,
"language": doc_transformed.metadata.get("language"),
}
클래스 구조 - extract 메소드 - 플랫폼 정보 추출:
parsed_url = urlparse(link)
platform = parsed_url.netloc
클래스 구조 - extract 메소드 - MongoDB 에 저장:
- self.model: ArticleDocument 모델을 사용하여 크롤링된 데이터를 MongoDB에 저장합니다.
- content: 아티클의 콘텐츠를 저장합니다.
- link: 아티클의 링크를 저장합니다.
- platform: 플랫폼 정보를 저장합니다.
- author_id 및 author_full_name: 아티클 작성자 정보를 저장합니다.
user = kwargs["user"]
instance = self.model(
content=content,
link=link,
platform=platform,
author_id=user.id,
author_full_name=user.full_name,
)
instance.save()
logger.info(f"Finished scrapping custom article: {link}")
MediumCrawler 클래스 설명
이 클래스는 Selenium과 BeautifulSoup을 활용한 Medium 웹사이트 크롤러를 구현한 것.
MediumCrawler 클래스는 BaseSeleniumCrawler를 상속받으며, Medium에서 특정 기사(Article)를 찾아 크롤링한 후 데이터베이스에 저장하는 역할을 합니다.
클래스 정의:
- MediumCrawler 는 BaseSeleniumCrawler 를 상속받은 크롤러 클래스입니다.
- model 속성에 ArticleDocument 를 할당해두어, 크롤링한 데이터를 이 모델 구조에 맞춰 저장할 수 있습니다.
class MediumCrawler(BaseSeleniumCrawler):
model = ArticleDocument
드라이버 옵션 설정:
- set_extra_driver_options 메서드는 Selenium WebDriver에 추가 옵션을 설정하는 역할을 합니다.
- 여기서는 --profile-directory=Profile 2 인자를 추가해, 특정 브라우저 프로필을 사용하도록 설정합니다. 예를 들어, Chrome 프로필 중 ‘Profile 2’를 불러오게끔 하는 식입니다.
def set_extra_driver_options(self, options) -> None:
options.add_argument(r"--profile-directory=Profile 2")
콘텐츠 추출 로직 (extract 메서드):
- DB 중복 체크:
- self.model.find(link=link)로 데이터베이스에서 해당 링크가 이미 존재하는지 확인합니다.
- 이미 저장된 기사라면, return으로 함수를 종료합니다.
- 페이지 로드 및 스크롤:
- 중복이 아니라면 로그를 남기고, self.driver.get(link)를 통해 해당 URL로 이동합니다.
- self.scroll_page() 호출로 페이지를 아래로 스크롤해, Lazy Loading 등으로 인해 늦게 로드되는 콘텐츠까지 모두 불러옵니다.
def extract(self, link: str, **kwargs) -> None:
old_model = self.model.find(link=link)
if old_model is not None:
logger.info(f"Article already exists in the database: {link}")
return
logger.info(f"Starting scrapping Medium article: {link}")
self.driver.get(link)
self.scroll_page()
BeautifulSoup을 이용한 HTML 파싱:
- self.driver.page_source로 현재 렌더링된 웹페이지의 HTML을 가져온 뒤, BeautifulSoup(BS4)로 파싱합니다.
- 원하는 요소들을 찾기 위해 find_all 함수를 사용합니다. 예를 들어:
- 제목(title)은 <h1> 태그와 클래스명 "pw-post-title"를 기준으로 찾습니다.
- 부제목(subtitle)은 <h2> 태그와 클래스명 "pw-subtitle-paragraph"를 기준으로 찾습니다.
- 제목(title)은 <h1> 태그와 클래스명 "pw-post-title"를 기준으로 찾습니다.
- soup.get_text() 함수를 통해 HTML 태그를 제외한 텍스트를 추출함. 그리고 추출한 데이터를 data 딕셔너리에 담아둡니다
soup = BeautifulSoup(self.driver.page_source, "html.parser")
title = soup.find_all("h1", class_="pw-post-title")
subtitle = soup.find_all("h2", class_="pw-subtitle-paragraph")
data = {
"Title": title[0].string if title else None,
"Subtitle": subtitle[0].string if subtitle else None,
"Content": soup.get_text(),
}
드라이버 닫기 및 데이터베이스 저장:
- Selenium 드라이버 종료
- 크롤링이 끝났으므로, 시스템 리소스를 절약하기 위해 self.driver.close() 로 드라이버를 닫습니다.
- DB 모델 인스턴스 생성 및 저장
- user 정보를 매개변수에서 받아오고, ArticleDocument 모델의 인스턴스를 생성합니다.
- platform, content, link, author_id, author_full_name 같은 필드를 채워넣고 instance.save() 로 데이터베이스에 저장합니다.
- 저장이 완료된 후 성공 메시지를 로그로 남깁니다.
self.driver.close()
user = kwargs["user"]
instance = self.model(
platform="medium",
content=data,
link=link,
author_id=user.id,
author_full_name=user.full_name,
)
instance.save()
logger.info(f"Successfully scraped and saved article: {link}")
The NoSQL data warehouse documents
세 가지 주요 문서 클래스를 구현하여 데이터 카테고리(예: 기사, 게시글, 저장소)에 필요한 속성을 정의합니다.
- ArticleDocument: Medium 기사와 같은 텍스트 기반 콘텐츠
- PostDocument: 소셜 미디어나 포럼의 게시글
- RepositoryDocument: GitHub와 같은 저장소 관련 정보
하나의 문서 타입을 사용하는 것이 아니라 이렇게 구별한 이유는 런타임 에러를 막기 위해서임. 데이터 항목에 특정 속성 값이 없을 경우 런타임 에러가 발생할건데, 이렇게 문서 별로 가져야 할 속성을 명시하면 이 에러를 방지할 수 있음.
문서 모델 구조는 NoSQLBaseDocument -> Document -> (ArticleDocument, PostDocument, RepositoryDocument) 로 상속 구조가 됨.
NoSQLBaseDocument 에 저장하고 불러오는 기능이 다 있기 때문에 몽고 DB 와 상호작용하는 코드는 하나만 작성하면 됨.
The ORM and ODM software patterns
이 글에서는 ORM 패턴에 대한 상기를 바탕으로 이 패러다임을 NoSQL 데이터베이스 중 하나인 MongoDB 에 적용하는 걸 보여줄거임.
ORM 이란:
- ORM은 데이터베이스와 상호작용할 때 SQL이나 특정 API 쿼리를 직접 작성하는 대신, 객체 지향 패러다임을 사용해 데이터의 조회, 삽입, 수정, 삭제(CRUD) 등의 작업을 수행할 수 있도록 도와주는 기법입니다.
- 주요 장점:
- 복잡성 추상화: 데이터베이스 작업의 복잡한 로직을 ORM 클래스 내부에 캡슐화하여, 사용자가 직접 SQL을 작성할 필요 없이 간단한 객체 조작만으로 작업할 수 있습니다.
- 코드 중복 감소: 반복적인 보일러플레이트 코드를 줄여 개발 생산성을 높입니다.
- 대상 데이터베이스:
- ORM은 일반적으로 PostgreSQL, MySQL과 같은 SQL 데이터베이스와 함께 사용됩니다.
Python에서의 ORM 활용 예시 (SQLAlchemy):
- SQLALchemy 는 Python에서 가장 널리 사용되는 ORM 라이브러리로, SQL 쿼리를 직접 작성하지 않고도 데이터베이스와 상호작용할 수 있게 해줍니다.
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
# User ORM: users 테이블과 매핑되는 클래스를 정의합니다.
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
- 위 코드는 User 클래스를 정의하여, SQL 데이터베이스 내의 users 테이블에 매핑합니다.
# 데이터베이스 엔진 생성 (여기서는 메모리 내 SQLite 사용)
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
# 데이터베이스와 상호작용할 세션 생성
Session = sessionmaker(bind=engine)
session = Session()
# 새로운 유저 추가
new_user = User(name="Alice")
session.add(new_user)
session.commit()
- 이 코드는 새로운 유저를 추가하고 커밋하여 데이터베이스에 저장하는 과정을 보여줍니다.
# 유저 조회
user = session.query(User).first()
if user:
print(f"User ID: {user.id}")
print(f"User name: {user.name}")
- 위 예시는 SQLAlchemy를 이용해 데이터베이스에서 유저를 조회하는 방법을 설명합니다.
ODM (Object-Document Mapping) 패턴:
- ODM은 ORM과 유사한 개념이지만, SQL 데이터베이스 대신 NoSQL 데이터베이스(예: MongoDB)와의 상호작용에 사용됩니다.
- 주요 특징:
- NoSQL 데이터베이스는 테이블 대신 컬렉션을 사용하고, 각 컬렉션은 JSON과 유사한 문서(document) 를 저장합니다.
- ODM은 이러한 JSON 형태의 문서를 객체 지향 프로그래밍 방식으로 다루도록 해줍니다.
- 목표:
- ODM은 객체 지향 코드와 NoSQL 데이터베이스의 JSON 문서 사이의 매핑을 단순화하고, CRUD 작업을 쉽게 수행할 수 있도록 합니다.
Implementing the ODM class
ODM 클래스 구현의 목적과 개요:
- 목적:
- ODM(Object-Document Mapping)은 SQL 데이터베이스의 ORM과 유사하게, NoSQL 데이터베이스(여기서는 MongoDB)와의 상호작용을 객체 지향 방식으로 단순화하기 위한 패턴입니다.
- 이 글에서는 NoSQLBaseDocument라는 기본 ODM 클래스를 처음부터 직접 구현하는 과정을 소개합니다.
- 이를 통해 모듈화되고 재사용 가능한 Python 클래스를 작성하는 연습을 하며, 나아가 프로덕션 코드에 필요한 OOP 원칙들을 익히게 됩니다.
- 기본 아이디어:
- 모든 문서(예: ArticleDocument, PostDocument 등)는 이 NoSQLBaseDocument를 상속받아 MongoDB와의 CRUD(생성, 조회, 업데이트, 삭제) 작업을 쉽게 수행할 수 있도록 합니다.
데이터베이스 연결 및 필수 모듈 설정
- 모듈 임포트와 데이터베이스 연결:
- 다양한 필수 모듈을 임포트합니다.
- pydantic를 사용해 모델의 타입 검증을 지원하고, pymongo를 통해 MongoDB와 통신합니다.
- _database 변수는 설정 파일(settings)에 명시된 데이터베이스(기본 이름은 "twin")와의 연결을 담당합니다.
import uuid
from abc import ABC
from typing import Generic, Type, TypeVar
from loguru import logger
from pydantic import UUID4, BaseModel, Field
from pymongo import errors
from llm_engineering.domain.exceptions import ImproperlyConfigured
from llm_engineering.infrastructure.db.mongo import connection
from llm_engineering.settings import settings
_database = connection.get_database(settings.DATABASE_NAME)
제네릭 타입 변수 T 선언:
- 이 변수는 NoSQLBaseDocument를 상속받는 모든 클래스에 대해 일반화된 타입으로 활용됩니다.
- 예를 들어, ArticleDocument가 NoSQLBaseDocument를 상속받으면, 이때 T는 ArticleDocument 타입으로 대체됩니다.
T = TypeVar("T", bound="NoSQLBaseDocument")
NoSQLBaseDocument 클래스 선언:
- 이 클래스는 Pydantic의 BaseModel, Generic, 그리고 ABC(추상 클래스) 를 상속받아 ODM의 기본 뼈대를 구성합니다.
- 모든 ODM 문서 클래스들은 이 클래스를 상속받아 공통 기능(예: 데이터 변환, 저장, 조회 등)을 공유하게 됩니다.
class NoSQLBaseDocument(BaseModel, Generic[T], ABC):
배경지식 - 제네릭 이해하기:
- TypeVar란?
- TypeVar는 Python의 typing 모듈에서 제공하는 기능으로, 제네릭(Generic) 프로그래밍에서 사용되는 타입 변수입니다.
- 예: 만약 리스트에 들어가는 요소의 타입을 일반화하고 싶다면 T와 같은 타입 변수를 정의하고, 리스트의 요소 타입으로 사용합니다.
- bound="NoSQLBaseDocument"의 의미
- bound 인자는 해당 타입 변수가 가질 수 있는 상한(boundary) 를 지정합니다.
T = TypeVar("T", bound="NoSQLBaseDocument")
이 코드는 T 타입 변수는 반드시 NoSQLBaseDocument 또는 그 서브클래스여야 한다는 제약을 부여합니다.- 즉, T로 사용될 타입은 NoSQLBaseDocument의 모든 기능을 가지고 있다는 보장이 있어, 이후에 메서드에서 T를 사용해도 NoSQLBaseDocument에 정의된 기능을 안전하게 사용할 수 있습니다.
- Generic[T] 클래스를 상속받는 이유
- Generic[T]를 상속하면, 클래스 내부에서 제네릭 타입 변수 T를 사용할 수 있게 됩니다
- 이는 클래스 메서드나 인스턴스 메서드의 반환 타입 또는 매개변수 타입을 더 정확하게 명시할 수 있게 도와줍니다.
- 예:
def save(self: T, **kwargs) -> T | None:
- 이 메서드는 save()가 호출된 인스턴스의 타입을 그대로 반환하도록 명시합니다.
- 만약 ArticleDocument가 NoSQLBaseDocument를 상속받고 있다면, save() 메서드의 반환 타입은 ArticleDocument | None로 추론됩니다.
- 단순히 추상 클래스의 기능만 공유한다면 굳이 제네릭을 쓸 필요가 없을 수도 있습니다. 그러나 제네릭을 사용하면 클래스의 메서드가 호출되는 서브클래스의 구체적인 타입에 맞춰 정확한 타입 정보를 제공할 수 있으며, 이후 코드에서 타입 안정성과 가독성을 높여 보다 견고한 코드를 작성할 수 있게 됩니다.
고유 ID 필드와 비교/해시 메서드 구현:
- 각 문서는 고유 식별자로 UUID4를 사용하며, 자동으로 고유한 ID를 생성합니다.
- 이 메서드들은 인스턴스 간의 비교와 해시 기반 자료구조(예: 집합, 딕셔너리의 키)에서의 사용을 가능하게 합니다.
id: UUID4 = Field(default_factory=uuid.uuid4)
def __eq__(self, value: object) -> bool:
if not isinstance(value, self.__class__):
return False
return self.id == value.id
def __hash__(self) -> int:
return hash(self.id)
MongoDB와의 데이터 변환 메서드 - from_mongo 메소드:
- MongoDB에서 조회한 딕셔너리 형태의 데이터를 클래스 인스턴스로 변환합니다.
- MongoDB 문서의 기본 키(_id)를 클래스의 id 필드로 매핑합니다.
@classmethod
def from_mongo(cls: Type[T], data: dict) -> T:
if not data:
raise ValueError("Data is empty.")
id = data.pop("_id")
return cls(**dict(data, id=id))
MongoDB와의 데이터 변환 메서드 - to_mongo() 메서드:
- 클래스 인스턴스를 MongoDB에 저장 가능한 딕셔너리 형태로 변환합니다.
- 특히, id 필드를 _id로 매핑하며, UUID 타입의 값을 문자열로 변환합니다.
- 모델 내에 UUID 타입의 필드가 있다면, MongoDB에 저장할 때 UUID 객체를 그대로 저장할 수 없으므로 이를 문자열로 변환합니다.
- exclude_unset:
- 모델에서 값이 설정되지 않은 필드를 결과에서 제외할지 여부를 결정합니다.
- False 로 지정하면 별도로 설정되지 않은 필드도 포함합니다.
- by_alias:
- Pydantic 모델의 필드를 정의할 때 별칭(alias)을 설정한 경우, 해당 별칭을 사용하여 출력할지 여부를 결정합니다.
- True 로 설정하면 기본적으로 별칭을 사용하여 직렬화합니다.
- model_dump 메서드:
- Pydantic의 BaseModel에서 제공하는 메서드로, 모델의 데이터를 딕셔너리 형태로 변환합니다.
- 앞서 설정한 exclude_unset과 by_alias 옵션을 사용하여, 출력할 필드와 필드 이름(별칭 사용 여부)을 결정합니다.
def to_mongo(self: T, **kwargs) -> dict:
exclude_unset = kwargs.pop("exclude_unset", False)
by_alias = kwargs.pop("by_alias", True)
parsed = self.model_dump(exclude_unset=exclude_unset, by_alias=by_alias, **kwargs)
if "_id" not in parsed and "id" in parsed:
parsed["_id"] = str(parsed.pop("id"))
for key, value in parsed.items():
if isinstance(value, uuid.UUID):
parsed[key] = str(value)
return parsed
MongoDB 데이터 조회 메소드 - get_or_create() 메서드:
- 주어진 필터 옵션에 맞는 문서를 데이터베이스에서 검색합니다.
- 문서가 있으면 이를 인스턴스로 변환해 반환하고, 없으면 새 인스턴스를 생성하여 저장 후 반환합니다.
@classmethod
def get_or_create(cls: Type[T], **filter_options) -> T:
collection = _database[cls.get_collection_name()]
try:
instance = collection.find_one(filter_options)
if instance:
return cls.from_mongo(instance)
new_instance = cls(**filter_options)
new_instance = new_instance.save()
return new_instance
except errors.OperationFailure:
logger.exception(f"Failed to retrieve document with filter options: {filter_options}")
raise
MongoDB 데이터 조회 메소드 - find() 메소드:
@classmethod
def find(cls: Type[T], **filter_options) -> T | None:
collection = _database[cls.get_collection_name()]
try:
instance = collection.find_one(filter_options)
if instance:
return cls.from_mongo(instance)
return None
except errors.OperationFailure:
logger.error("Failed to retrieve document.")
return None
MongoDB 데이터 조회 메소드 - bulk_find() 메서드:
- 조건에 맞는 여러 문서를 검색하고, 각 문서를 인스턴스로 변환하여 리스트로 반환합니다.
@classmethod
def bulk_find(cls: Type[T], **filter_options) -> list[T]:
collection = _database[cls.get_collection_name()]
try:
instances = collection.find(filter_options)
return [document for instance in instances if (document := cls.from_mongo(instance)) is not None]
except errors.OperationFailure:
logger.error("Failed to retrieve document.")
return []
MongoDB - 데이터 저장 메소드 - bulk_insert() 메서드:
- 여러 문서를 한 번에 MongoDB 컬렉션에 삽입하는 기능을 제공합니다.
- 삽입 실패 시 오류를 기록하고 False를 반환합니다.
@classmethod
def bulk_insert(cls: Type[T], documents: list[T], **kwargs) -> bool:
collection = _database[cls.get_collection_name()]
try:
collection.insert_many([doc.to_mongo(**kwargs) for doc in documents])
return True
except (errors.WriteError, errors.BulkWriteError):
logger.error(f"Failed to insert documents of type {cls.__name__}")
return False
MongoDB 컬렉션 이름 결정 - get_collection_name() 메서드:
- 각 서브클래스는 내부의 Settings 클래스를 통해 자신에게 해당하는 MongoDB 컬렉션의 이름을 지정해야 합니다.
- 만약 Settings 클래스나 그 안의 name 속성이 없다면, ImproperlyConfigured 예외를 발생시켜 잘못된 구성을 알립니다.
@classmethod
def get_collection_name(cls: Type[T]) -> str:
if not hasattr(cls, "Settings") or not hasattr(cls.Settings, "name"):
raise ImproperlyConfigured(
"Document should define an Settings configuration class with the name of the collection."
)
return cls.Settings.name
Data categories and user document classes
NoSQLBaseDocument를 상속받아 구체적인 데이터 모델(문서)을 정의하는 부분입니다.
이 문서 클래스들은 기사, 게시글, 저장소 등 각 데이터 카테고리에 맞게 고유한 필드와 설정(예: 컬렉션 이름)을 갖도록 설계됩니다
또한, 공통 기능(예: CRUD 작업, 데이터 변환 등)은 부모 클래스인 NoSQLBaseDocument에서 제공하여 중복 코드를 줄이고 일관성을 유지합니다.
데이터 카테고리 Enum 클래스 정의:
- DataCategory는 여러 데이터 유형을 중앙에서 관리하는 상수 집합 역할을 합니다.
- 이를 통해 ODM 클래스들을 구성할 때, 각 문서가 속하는 컬렉션 이름이나 타입을 일관성 있게 지정할 수 있습니다.
- 또한, 이후에 다른 데이터 카테고리도 쉽게 추가하거나 수정할 수 있는 기반을 마련합니다.
from enum import StrEnum
class DataCategory(StrEnum):
PROMPT = "prompt"
QUERIES = "queries"
INSTRUCT_DATASET_SAMPLES = "instruct_dataset_samples"
추가적인 상수 정의:
- 각 데이터 유형에 대한 문자열 상수를 제공하여, 코드 내에서 직접 문자열을 반복 사용하지 않고 상수로 관리할 수 있도록 합니다.
INSTRUCT_DATASET = "instruct_dataset"
PREFERENCE_DATASET_SAMPLES = "preference_dataset_samples"
PREFERENCE_DATASET = "preference_dataset"
POSTS = "posts"
ARTICLES = "articles"
REPOSITORIES = "repositories"
데이터 수집 파이프라인과 ZenML
ZenML의 역할:
- ZenML은 데이터 수집 파이프라인의 오케스트레이션(워크플로우 관리)을 담당합니다.
- 파이프라인은 수동으로 실행할 수도 있고, 스케줄링하거나 특정 이벤트에 의해 자동으로 트리거될 수도 있습니다.
- 여기서는 수동 실행하는 방법에 초점을 맞춥니다.
각 저자별 파이프라인 구성:
- 각 저자(Paul Iusztin, Maxime Labonne 등)마다 별도의 ZenML 구성 파일(YAML)이 제공됩니다.
- 이 구성 파일에는 저자 이름과 함께 수집할 링크들이 포함되어 있습니다.
예시: Maxime Labonne의 데이터 수집 실행
- 이 명령어는 ZenML 구성 파일에 정의된 파라미터를 사용하여 Maxime Labonne의 데이터를 수집합니다.
poetry poe run-digital-data-etl-maxime
예시: ZenML 구성 파일 예시:
parameters:
user_full_name: Maxime Labonne # [First Name(s)] [Last Name]
links:
# Personal Blog
- https://mlabonne.github.io/blog/posts/2024-07-29_Finetune_llama31.html
- https://mlabonne.github.io/blog/posts/2024-07-15_The_Rise_of_Agentic_Data_Generation.html
# Substack
- https://maximelabonne.substack.com/p/uncensor-any-llm-with-abliteration-d30148b7d43e
- https://maximelabonne.substack.com/p/create-mixtures-of-experts-with-mergekit-11b318c99562
- https://maximelabonne.substack.com/p/merge-large-language-models-with-mergekit-2118fb392b54
... # More Substack links
실행 결과(DAG) 확인:
- ZenML 대시보드를 통해 파이프라인의 실행 흐름(DAG)과 각 단계의 상태, 메타데이터 등을 확인할 수 있습니다.
- 이 정보는 파이프라인 모니터링과 디버깅에 매우 유용합니다.
예시: Paul Iusztin의 데이터 수집 실행
- Paul Iusztin의 구성 파일은 Medium 및 Substack 링크를 포함하며, YAML 파일에 저자 이름과 링크들이 정의되어 있습니다.
poetry poe run-digital-data-etl-paul
예시: Paul Iusztin의 데이터 수집 실행의 CLI
poetry run python -m tools.run --run-etl --no-cache --etl-config-filename digital_data_etl_paul_iusztin.yaml
모든 저자에 대해 한 번에 파이프라인을 실행하는 명령도 별도로 제공됩니다:
poetry poe run-digital-data-etl
MongoDB 데이터 웨어하우스 쿼리 및 ODM 활용:
from llm_engineering.domain.documents import ArticleDocument, UserDocument
user = UserDocument.get_or_create(first_name="Paul", last_name="Iusztin")
articles = ArticleDocument.bulk_find(author_id=str(user.id))
print(f"User ID: {user.id}")
print(f"User name: {user.first_name} {user.last_name}")
print(f"Number of articles: {len(articles)}")
print("First article link:", articles[0].link)
'Generative AI > Data' 카테고리의 다른 글
파인튜닝을 위한 데이터 합성 방법 정리 (0) | 2025.02.04 |
---|---|
NVIDIA: Curating Trillion-Token Datasets: Introducing NVIDIA NeMo Data Curator (0) | 2025.01.26 |
NVIDIA: Synthetic Data Generation (0) | 2025.01.25 |
What Makes Good Data For Alignment? A Comprehensive Study of Automatic Data Selection In Instruction Tuning (0) | 2025.01.25 |
Alpagasus: Traning A better Alpaca with Fewer Data (0) | 2025.01.23 |