Advertisement

Parallel.ForEach处理List导致数据丢失

阅读量:

是同一个dt,但执行之后的结果行数不同,代码如下:

复制代码
  List<logistics_freight_bill_tbl> insertList1 = new List<logistics_freight_bill_tbl>();

    
         foreach (DataRow row in dt.Rows)
    
         {
    
             logistics_freight_bill_tbl info = new logistics_freight_bill_tbl();
    
             info.address = row["目的地"].ToString();
    
             insertList1.Add(info);
    
         }
    
  
    
         List<logistics_freight_bill_tbl> insertList2 = new List<logistics_freight_bill_tbl>();
    
         Parallel.ForEach(dt.Rows.Cast<DataRow>(), new ParallelOptions() { MaxDegreeOfParallelism = 10 }, async (row) =>
    
         {
    
             logistics_freight_bill_tbl info = new logistics_freight_bill_tbl();
    
             info.address = row["目的地"].ToString();
    
             insertList2.Add(info);
    
         });

insertList1 是在一个普通的 foreach 循环中被填充,而 insertList2 是在一个并行循环 Parallel.ForEach 中被填充。问题出现在对 insertList2 的操作上,因为 List<T> 类型不是线程安全的。

当你在 Parallel.ForEach 中使用 insertList2.Add(info) 时,可能会有多个线程同时尝试向 insertList2 添加元素。由于 List<T> 没有同步机制来保护其内部数据结构,在并发环境下这可能导致数据丢失或异常,从而使得 insertList2 的最终元素数量与 dt.Rows 的行数不一致。

要解决这个问题,你可以采取以下几种方法之一:

1. 使用线程安全集合

例如,ConcurrentBag<T> 是一个线程安全的集合,适用于并行操作。

复制代码
 ConcurrentBag<logistics_freight_bill_tbl> insertList2 = new ConcurrentBag<logistics_freight_bill_tbl>();

    
 Parallel.ForEach(dt.Rows.Cast<DataRow>(), new ParallelOptions() { MaxDegreeOfParallelism = 10 }, (row) =>
    
 {
    
     logistics_freight_bill_tbl info = new logistics_freight_bill_tbl();
    
     info.address = row["目的地"].ToString();
    
     insertList2.Add(info);
    
 });

2. 使用锁

通过在添加元素时使用锁来确保每次只有一个线程能够修改列表。

复制代码
 List<logistics_freight_bill_tbl> insertList2 = new List<logistics_freight_bill_tbl>();

    
 object lockObj = new object();
    
 Parallel.ForEach(dt.Rows.Cast<DataRow>(), new ParallelOptions() { MaxDegreeOfParallelism = 10 }, (row) =>
    
 {
    
     logistics_freight_bill_tbl info = new logistics_freight_bill_tbl();
    
     info.address = row["目的地"].ToString();
    
     lock (lockObj)
    
     {
    
     insertList2.Add(info);
    
     }
    
 });

3. 收集并行结果然后合并

在并行循环中,为每个线程创建局部列表,然后在循环结束后将它们合并到主列表中。

复制代码
 List<logistics_freight_bill_tbl> insertList2 = new List<logistics_freight_bill_tbl>();

    
 Parallel.ForEach(dt.Rows.Cast<DataRow>(),
    
     new ParallelOptions() { MaxDegreeOfParallelism = 10 },
    
     () => new List<logistics_freight_bill_tbl>(), // 初始化线程局部变量
    
     (row, state, localList) => // body
    
     {
    
     logistics_freight_bill_tbl info = new logistics_freight_bill_tbl();
    
     info.address = row["目的地"].ToString();
    
     localList.Add(info);
    
     return localList;
    
     },
    
     (finalResult) => // localFinally
    
     {
    
     lock (insertList2)
    
     {
    
         insertList2.AddRange(finalResult);
    
     }
    
     }
    
 );

在这些方法中,选择哪一种取决于你的具体需求和性能考虑。如果保持顺序不是必须的,ConcurrentBag<T> 是一个简单有效的选择。如果需要保持特定的顺序,那么使用锁或者局部集合的方法可能更合适。

全部评论 (0)

还没有任何评论哟~