Press "Enter" to skip to content

基于 C# 的 ETL 大数据并行编程

作者:James Spinella

 

译者:精致码农

 

原文:https://bit.ly/3nGQu4J

 

并行编程在历史上一直是软件开发中比较小众和复杂的环节,往往不值得头疼。但编写并行化应用只会越来越简单,一个应用同时利用设备 CPU 上的多个内核,来实现效率最大化也是很常见的。

 

如今,随着数据工程作为一个专业领域的兴起,并行编程比以往任何时候都更受欢迎。Apache Spark 是一个用于 Extract(提取), Transform(转换) 和 Load(加载)——ETL
大型数据集的软件库,可能是当今最流行的并行编程的方式。虽然 Apache 的 Spark、Hadoop 和 AirFlow 是对数据工程师来说最常见的技术,但它们的使用要求精通的不是 C#,而是 Python、Scala 或 Java
尽管不是很理想)。

 

对于 ETL 工作来说,“正确”的工具应该是 Spark 或 Hadoop 这样的工具。它们是专门为 ETL 设计的,与 C# 或其他语言相比,需要你编写的代码更少。如果你想在数据工程这个领域深耕,你最好的选择学习一下 Python 和 Spark(以及其他技术)。尽管如此,但有时正确的工具是你已经知道如何使用的工具。事实上,我已经发现 C# 和 .NET 可以胜任并行化 ETL 操作的任务。

 

 

微软也一直在开发 .NET for Apache Spark [1]
,它允许我们即使只会 C# 也能吃上 Spark 这块蛋糕。很大程度上仍在开发中,但目前你已经可以开始尝试了。(译注:想了更多关于 .NET for Apache Spark 的内容,可以阅读我的另一篇译文: 使用 .NET 5 体验大数据和机器学习

 

多年来,C# 的发展使并行编程变得越来越简单。因为 C# 与以前的版本保持 100% 的向后兼容,所以很难知道在众多并行运行代码的方法中哪种是最好的方法。事实上,在 .NET 中,有好几种方法可以启动多个线程,而真正的问题在于,你希望 .NET 在“背后”为你处理多少和你希望自己手动处理多少。

 

一般来说,我们确实希望 .NET 尽可能多的为我们处理,特别是在线程管理方面,因为并行运行代码是(编程时)非常复杂的,而且非常容易出现意外的运行时错误。事实上,微软在这里 [2]
有一篇专门的文档涵盖了并行编程的潜在陷阱。

 

我建议大家读一读,但就 ETL 和其他“数据处理”任务而言,我们真正需要担心的只有两件事: 并行化是否真的会更快,如果是,确保我们的代码是线程安全的。

 

 

CPU密集型 vs IO密集型

 

在确定“并行化”代码是否值得时,重要的是要了解应用程序的哪些部分是 CPU 密集型
(CPU-bound)而不是  IO 密集型
(IO-bound)的。正如你可能已经猜到的那样,并行化增强了 CPU 密集型代码的性能,它不仅不会对任何 IO 瓶颈(bottlenecks)产生改善,而且可能会加剧应用程序的 IO 瓶颈(相比之下,异步编程的目的是减少 IO 密集)。

 

 

CPU 密集型的代码通常是对程序中的对象进行的运算或其他操作。解析 CSV 文件、映射对象和计算平均值都依赖于 CPU。在处理数据时,效率来自于根据可用的 CPU 数量拆分数据集,基本上在每个 CPU 上同时运行程序–只是处理的是整个数据集不同的分组。

 

当代码的执行依赖于通过“网线”发送或接收数据时,即通过互联网或内部网络连接到另一个服务器时,代码就是 IO 密集型的。这种情况在调用 API 或数据库的存储库方法中最为常见。如果代码向持久性存储(如硬盘或固态硬盘)写入或读取,那幺它也可以是 IO 密集型的。如果我们使用的第三方 API 需要 10 秒才能将数据返回给我们的应用程序,我们也没有什幺办法,然而通过异步编程,我们至少可以让我们的程序在等待 API 调用的时候继续使用 CPU 运行其他部分,而不是闲置。

 

在处理每个数据集都依赖于 IO 密集型调用的情况下,比如调用数据库对处理后的数据进行 INSERT,那幺我们运行程序的 CPU 线程数与调用外部源(如 API 或数据库)的次数之间的平衡就显得非常重要。如果 API 有使用限制(例如每秒 10 个请求),或者数据库服务器没有足够的线程来处理我们的程序比如说 20 个线程都试图同时向同一个数据库 insert,那幺这一点就尤其重要。

 

在不使程序的 IO 密集型过载的情况下最大化效率和最小化程序运行时间,可能需要对程序在 ETL 过程的各个步骤中使用的线程数量进行一些试错式的调整。

 

 

Parallel.ForEach vs PLINQ

 

当使用 .NET 时,现在可以通过一个简单的 Parallel.ForEach 循环或 Parallel LINQ [3]
(PLINQ)来实现并行化应用程序所需的一切。对于 .NET 来说,这些并不是特别新的东西,但与它们的前身相比,它们更容易使用,因为它们需要创建和管理线程,并手动分割集合。Parallel.ForEach 和 PLINQ 两者都为我们处理了这一切,根据我的经验,两者之间的性能没有明显的差异。我猜测它们在底层调用的代码大致相同。

 

下面是分别使用 Parallel.ForEach 和 PLINQ 读取 CSV 文件的示例:

 

// PLINQ using all CPU cores
public static void PLINQAll(string filePath)
{
var sw = new Stopwatch();
    sw.Start();
var results = System.IO.File.ReadAllLines(filePath)
        .AsParallel()
        .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
        .ToList();
    sw.Stop();
    Console.WriteLine($"PLINQ using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// PLINQ using all CPU threads (2x cores)
public static void PLINQAllThreads(string filePath)
{
var threads = Environment.ProcessorCount * 2;
var sw = new Stopwatch();
    sw.Start();
var results = System.IO.File.ReadAllLines(filePath)
        .AsParallel()
        .WithDegreeOfParallelism(threads)
        .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
        .ToList();
    sw.Stop();
    Console.WriteLine($"PLINQ using all THREADS {(threads)}: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// PLINQ using 2 CPU cores
public static void PLINQ2(string filePath)
{
var sw = new Stopwatch();
    sw.Start();
var results = System.IO.File.ReadAllLines(filePath)
        .AsParallel()
        .WithDegreeOfParallelism(2)
        .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
        .ToList();
    sw.Stop();
    Console.WriteLine($"PLINQ using 2 cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// PLINQ using user-defined thread count
public static void PLINQUser(string filePath, int numThreads)
{
var sw = new Stopwatch();
    sw.Start();
var results = System.IO.File.ReadAllLines(filePath)
        .AsParallel()
        .WithDegreeOfParallelism(numThreads)
        .Select(line => Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
        .ToList();
    sw.Stop();
    Console.WriteLine($"PLINQ using {numThreads} threads: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// PLINQ using all CPU cores without Regex parsing - May yield bad data due to commas within fields
public static void PLINQNoRegex(string filePath)
{
var sw = new Stopwatch();
    sw.Start();
var results = System.IO.File.ReadAllLines(filePath)
        .AsParallel()
        .Select(x => x.Split(','))
        .ToList();
    sw.Stop();
    Console.WriteLine($"PLINQ using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// Parallel.ForEach using all CPU cores - May yield bad data
public static void ParallelForEach(string filePath)
{
var sw = new Stopwatch();
    sw.Start();
var rows = new List<string[]>();
    Parallel.ForEach(File.ReadLines(filePath), line =>
    {
        rows.Add(line.Split(','));
    });
    sw.Stop();
    Console.WriteLine($"Parallel.ForEach using all cores (no Regex): completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}
// Parallel.ForEach using all CPU cores with Regex parsing - takes roughly the same amount of time as PLINQ
public static void ParallelForEachRegex(string filePath)
{
var sw = new Stopwatch();
    sw.Start();
var rows = new List<string[]>();
    Parallel.ForEach(File.ReadLines(filePath), line =>
    {
        rows.Add(Regex.Split(line, ",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"));
    });
    sw.Stop();
    Console.WriteLine($"Parallel.ForEach w/Regex using all cores: completed in {Math.Round(sw.Elapsed.TotalSeconds)} seconds");
}

 

那幺应该选择哪一种方式呢?我建议根据你在非并行情况下使用哪种方法来决定–你会使用 foreach
循环还是  LINQ
查询?也就是说,这两者之间最大的、不明显的区别是 Parallel.ForEach 允许你指定线程数,最多不超过计算机或服务器上可用的线程数(你可以指定更多的线程数,但它最多只能启动 CPU 所拥有的线程数)。然而,根据你的指定,PLINQ 可以使用超过计算机上 CPU 线程数的线程(译注:不是所有的线程都是工作中的,所以理论上可以创建无数个线程)。一般情况下,你不会希望启动超出比 CPU 线程更多的线程,但在某些情况下,这样做会更有效。例如,如果你要写一个网络爬虫,那幺启动双倍的线程数量可能是有意义的,因为每个线程大概都要等待网站的加载。其他这样的网络密集型(IO 密集型的一个子集)任务可能会在更多线程的情况下运行得更快。

 

Parallel.ForEach 和 PLINQ 之间还有一些其他的区别,在这里 [4]
进行了讨论。但是对于 ETL 来说,除非在非常特殊的情况下,这些区别不太适用。例如,如果必须保留数据的顺序,你应该使用 PLINQ,因为它提供了一种保留顺序的方法。

 

 

示例程序

 

我们的 ELT 示例程序(仓库地址 [5]
)将做以下四件事件:

 

 

读取 CSV 文件;

 

将这些 CSV 文件中的字段映射到 C# 对象;

 

对数据(对象列表)执行一些转换操作;

 

将这些数据插入到数据库中。

 

 

我将在接下来即将发布的第二篇文章中继续介绍该示例。

 

文中链接:

 

[1].
https://dotnet.microsoft.com/apps/data/spark

 

[2].
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/potential-pitfalls-in-data-and-task-parallelism

 

[3].
https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq

 

[4].

When To Use Parallel.ForEach and When to Use PLINQ

 

[5].
https://gitlab.com/jspinella/parallel-etl-examples

 

Be First to Comment

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注