经典案例

使用 Amazon EMR Serverless 和 Amazon SageMaker 快速且经济高

datetime

2026-01-27 12:40:54

阅读数量

10

使用 Amazon EMR 和 Amazon SageMaker 快速且经济高效地预处理和微调 LLMs

关键要点

在快速发展的人工智能领域,大型语言模型LLMs正在迅速流行。微调和预处理LLMs是使用这些技术的关键环节。本文将介绍如何利用 Amazon EMR Serverless 进行数据预处理,并使用 Amazon SageMaker JumpStart 部署微调后的 LLM。

使用 Common Crawl 数据集 该数据集包含大量网页数据,是微调 LLMs 常用的开放数据。Amazon EMR Serverless 提供无服务器架构,适合于大规模数据处理,无需基础设施维护。Amazon SageMaker JumpStart 简化了 LLM 的微调过程,允许用户轻松进行自定义模型训练。

大型语言模型LLMs正日益受到关注,新的应用场景层出不穷。一般来说,你可以通过在代码中加入提示工程来构建由 LLMs 驱动的应用。然而,在某些情况下,提示现有的 LLM可能会出现不足。这时,模型的微调就显得尤为重要。提示工程是通过精心编排输入提示来引导模型输出,而微调则是基于特定任务或领域的自定义数据集来训练模型,从而增强其适应性。

在进行微调之前,需要找到任务特定的数据集。常用的数据集之一是 Common Crawl dataset。该数据集包含自 2008 年以来定期收集的数PB的数据,包括网页原始数据、元数据抽取和文本抽取。此外,除了确定使用哪个数据集之外,还需要对数据进行清理和处理,以满足微调的特定需求。

我们最近与一位客户合作,客户希望预处理最新 Common Crawl 数据集的一个子集,并使用清洗后的数据微调他们的 LLM。客户寻求在 AWS 上以最具成本效益的方式实现这一目标。我们讨论了需求后,建议他们使用 Amazon EMR Serverless 作为他们的数据预处理平台。EMR Serverless 非常适合大规模数据处理,消除了基础设施维护的需求。在成本方面,EMR Serverless 仅根据每个作业所使用的资源和持续时间计费。客户能够在一周内使用 EMR Serverless 预处理数百 TB 的数据。数据预处理完成后,他们使用 Amazon SageMaker 微调了 LLM。

在本文中,我们将带您了解客户使用的用例和架构。

方案概述

在以下章节中,我们首先介绍 Common Crawl 数据集,以及如何探索和筛选所需的数据。 Amazon Athena 仅根据扫描的数据大小收费,适用于快速有效地探索和筛选数据。EMR Serverless 提供一种无维护、成本效益高的 Spark 数据处理选项,用于处理筛选过的数据。接下来,我们使用 Amazon SageMaker JumpStart 微调 Llama 2 model 并当使用预处理的数据集。SageMaker JumpStart 提供一系列常用用例的解决方案,只需几次点击即可部署。您无需编写任何代码即可微调像 Llama 2 这样的 LLM。最后,我们将微调后的模型部署到 Amazon SageMaker,并比较原始模型和微调后模型在回答相同问题上的文本输出差异。

下图展示了该解决方案的架构。

前置条件

在深入解决方案细节之前,请完成以下前置步骤:

创建一个 Amazon Simple Storage Service (Amazon S3) 存储桶以存储清洗后的数据集。有关说明,请参阅 创建您的第一个 S3 存储桶。设置 Athena 以运行交互式 SQL。创建一个 EMR Serverless 环境。准备 Amazon SageMaker Studio 来微调您的 LLM 并运行 Jupyter notebooks。有关说明,请参阅 入门。

Common Crawl 数据集

Common Crawl 是一个通过抓取超过 500 亿网页获得的开放数据集。它包含自 2008 年以来的大量非结构化文本数据,并且其规模达到 PB 级别。该数据集会不断更新。

在 GPT3 的训练过程中,Common Crawl 数据集占其训练数据的 60,如下图所示来源:Language Models are FewShot Learners。

另一个值得一提的重要数据集是 C4 dataset。C4,即 Colossal Clean Crawled Corpus,是从 Common Crawl 数据集中经过后处理得出的数据集。在 Meta 的 LLaMA 论文中,他们列出了使用的数据集,其中 Common Crawl 占 67利用了 33 TB 数据,而 C4 占 15利用了 783 GB 数据。论文强调了采用不同数据预处理方式的重要性,以提升模型性能。尽管原始 C4 数据是 Common Crawl 的一部分,但 Meta 选择了该数据的后处理版本。

在本节中,我们将介绍与 Common Crawl 数据集的交互、过滤和处理的常见方法。

Common Crawl 数据

Common Crawl 原始数据集包括三种类型的数据文件:原始网页数据WARC、元数据WAT和文本提取WET。

自 2013 年以来收集的数据以 WARC 格式存储,并包含相应的元数据WAT和文本提取数据WET。该数据集位于 Amazon S3 中,每月更新,可以通过 AWS Marketplace 直接访问。

例如,以下是 2023 年 6 月的数据示例:

bash aws s3 ls s3//commoncrawl/crawldata/CCMAIN202323/PRE segments/20230621 003408 2164 ccindextablepathsgz20230621 003408 637 ccindexpathsgz20230621 055205 2724 indexhtml20230621 003409 161064 non200responsespathsgz20230621 003410 160888 robotstxtpathsgz20230621 003410 480 segmentpathsgz20230621 003411 161082 warcpathsgz20230621 003412 160895 watpathsgz20230621 003412 160898 wetpathsgz

ccindextable

Common Crawl 数据集还提供用于过滤数据的索引表,称为 ccindextable。

ccindextable 是现有数据的索引,提供 WARC 文件的表格索引。它允许轻松查找与特定 URL 对应的 WARC 文件信息。

Common Crawl GitHub 仓库提供了 相应的 Athena 语句 来查询索引。有关每个字段的解释,请参见 Common Crawl Index Athena。

使用 Amazon EMR Serverless 和 Amazon SageMaker 快速且经济高

例如,您可以使用以下代码创建一个 Athena 表以映射 ccindex 数据:

quickq下载加速器

sqlCREATE EXTERNAL TABLE IF NOT EXISTS ccindex ( urlsurtkey STRING url STRING urlhostname STRING urlhosttld STRING urlhost2ndlastpart STRING urlhost3rdlastpart STRING urlhost4thlastpart STRING urlhost5thlastpart STRING urlhostregistrysuffix STRING urlhostregistereddomain STRING urlhostprivatesuffix STRING urlhostprivatedomain STRING urlhostnamereversed STRING urlprotocol STRING urlport INT urlpath STRING urlquery STRING fetchtime TIMESTAMP fetchstatus SMALLINT fetchredirect STRING contentdigest STRING contentmimetype STRING contentmimedetected STRING contentcharset STRING contentlanguages STRING contenttruncated STRING warcfilename STRING warcrecordoffset INT warcrecordlength INT warcsegment STRING)PARTITIONED BY ( crawl STRING subset STRING)STORED AS parquetLOCATION s3//commoncrawl/ccindex/table/ccmain/warc/

sql

add partitions

MSCK REPAIR TABLE ccindex

query

select from ccindex where crawl = CCMAIN201805 and subset = warc and urlhosttld = no limit 10

以上 SQL 语句展示了如何创建 Athena 表、添加分区并运行查询。

从 Common Crawl 数据集中过滤数据

正如您在创建表的 SQL 语句中看到的,有几个字段可以帮助过滤数据。例如,如果您想要统计特定期间内的中文文档数量,SQL 语句可以如下所示:

sqlSELECT url warcfilename contentlanguagesFROM ccindexWHERE (crawl = CCMAIN202314 OR crawl = CCMAIN202323) AND subset = warc AND contentlanguages =zhoLIMIT 10000

如果您希望进行进一步处理,可以将结果保存到另一个 S3 存储桶中。

分析过滤后的数据

Common Crawl GitHub 仓库 提供了多个用于处理原始数据的 PySpark 示例。

让我们来看一个在位于 s3//commoncrawl/crawldata/CCMAIN202323/segments/168522464338845/warc/ 的数据上运行 servercountpyCommon Crawl GitHub 仓库提供的示例脚本的示例。

首先,您需要一个 Spark 环境,比如 EMR Spark。例如,您可以在 useast1 启动一个 Amazon EMR 的 EC2 集群因为数据集位于 useast1。使用 EC2 集群可以帮助您进行测试,而不必直接提交到生产环境。

在启动 EMR 的 EC2 集群后,您需要 SSH 登录到集群的主节点。然后,打包 Python 环境并提交脚本请参阅 Conda 文档 以安装 Miniconda:

bash

create conda environment

conda create y n example c dmnapolitano python=37 botocore boto3 ujson requests condapack warcio

package the conda env

conda activate exampleconda pack o environmenttargz

get script from common crawl github

git clone https//githubcom/commoncrawl/ccpysparkgit

copy target file path to local

aws s3 cp s3//commoncrawl/crawldata/CCMAIN202323/warcpathsgz gzip d warcpathsgz

put warc list to hdfs

hdfs dfs put warcpaths

submit job

sparksubmit conf sparkyarnappMasterEnvPYSPARKPYTHON=/environment/bin/python conf sparksqlwarehousedir=s3//xxxxcommoncrawl/output/ master yarn deploymode cluster archives environmenttargz#environment pyfiles ccpyspark/sparkccpy ccpyspark/servercountpy inputbaseurl s3//commoncrawl/ /warcpaths countdemo

处理 warcpath 中的所有引用可能需要一些时间。出于演示目的,您可以使用以下策略来提高处理时间:

下载文件 s3//commoncrawl/crawldata/CCMAIN202323/warcpathsgz 到您的本地计算机,解压缩,然后上传到 HDFS 或 Amazon S3。这是因为 gzip 文件不可切割。您需要解压缩才能并行处理此文件。修改 warcpath 文件,删除大部分行,只保留两行,以显著加快作业运行速度。

作业完成后,您可以在 s3//xxxxcommoncrawl/output/ 查看结果,格式为 Parquet。

实施自定义处理逻辑

Common Crawl GitHub 仓库提供了一种常见的方法来处理 WARC 文件。通常,您可以扩展 CCSparkJob 来重写一个方法processrecord,这对许多情况下是足够的。

让我们来看一个要获取近期电影 IMDB 评论的示例。首先,您需要过滤出 IMDB 网站上的文件:

sqlSELECT url warcfilename urlhostnameFROM ccindexWHERE (crawl = CCMAIN202306 OR crawl = CCMAIN202340) AND subset = warc AND url like https//wwwimdbcom/title//reviewsLIMIT 1000

然后,您可以获取包含 IMDB 评论数据的 WARC 文件列表,并将 WARC 文件名称保存为文本文件中的列表。

另外,您也可以使用 EMR Spark 获取 WARC 文件列表并将其存储到 Amazon S3。例如:

pythonsql = SELECT warcfilenameFROM ccindexWHERE (crawl = CCMAIN202306 OR crawl = CCMAIN202340) AND subset = warc AND url like https//wwwimdbcom/title//reviews

warclist = sparksql(sql)

write result list to s3

warclistcoalesce(1)writemode(overwrite)text(s3//xxxxcommoncrawl/warclist/imdbwarclist)

输出文件应类似于 s3//xxxxcommoncrawl/warclist/imdbwarclist/part000006af127970cdc4ef2a438cf2b935f2ffdc000txt。

下一步是从这些 WARC 文件中提取用户评论。您可以扩展 CCSparkJob 来重写 processrecord() 方法:

pythonfrom sparkcc import CCSparkJobfrom bs4 import BeautifulSoupfrom urllibparse import urlsplit

class IMDBExtractJob(CCSparkJob) name = IMDBReviews

def processrecord(self record)    if selfisresponserecord(record)        domain =  urlsplit(recordrecheaders[WARCTargetURI])hostname        if domain == wwwimdbcom            contents = (                recordcontentstream()                    read()                    decode(utf8 replace)            )            soup =  BeautifulSoup(contents htmlparser)            reviewdivs =  soupfindall(class=text showmorecontrol)            for div in reviewdivs                yield divtext 1

if name == main job = IMDBExtractJob() jobrun()

您可以将上述脚本保存为 imdbextractorpy,接下来将在后续步骤中使用。准备好数据和脚本后,您可以使用 EMR Serverless 处理筛选后的数据。

EMR Serverless

EMR Serverless 是一种无服务器的部署选项,支持使用开源框架如 Apache Spark 和 Hive运行大数据分析应用,而无需配置、管理和扩展集群或服务器。

借助 EMR Serverless,您可以按需运行分析工作负载,自动缩放资源以应对不断变化的数据量和处理需求。EMR Serverless 自动调整资源的上下限,为您的应用提供合适的容量,您只需为实际使用的部分支付费用。

处理 Common Crawl 数据集通常是一次性处理任务,因此 EMR Serverless 工作负