将大量数据从CSV文件导入到MongoDB

首先开发商务应用程序时,从CSV文件中将数据输入或更新到数据库中是经常遇到的需求。最大的好处莫过于可以使用大家都喜欢的Microsoft Excel进行编辑了吧。

在MongoDB的情况下,如果数据无需加工,可以使用mongoimport来部分满足需求。但通常从Excel导出的CSV文件需要考虑字符编码为Shift JIS,还需要处理日期和时间数据以及将其转换为特定的应用程序数据等,因此编写相应的程序来完成这些转换是很常见的情况。

请用中文陈述以下内容(仅需一个选项):本次话题

我将尝试制作一个脚本,将由伪造个人信息的样本数据生成的5000条包含所有字段的CSV数据重复生成6次,并将总共30000条数据导入MongoDB数据库。

在读取的同时转换CSV文件CSV文件的字符编码为Shift JIS,而数据库使用的是UTF-8,这是一个常见的情况。此外,CSV文件中混合了半角假名和全角英数字,这也是常见于非开发部门提供的文件的特点。

用PHP时,可以使用mb_convert_encoding()函数来转换字符编码,使用mb_convert_kana()进行全角半角的统一转换。但是,如果文件很大,我们尽量避免一次性将整个文件内容读入内存并进行转换。

逐行读取并即时转换…这个过程会变得非常繁琐,尤其是还要处理引号等等。

在PHP中有一个名为fgetcsv()的函数,非常适合用于基于行的顺序访问处理CSV数据。但在多字节环境中,它就像是一片雷区一样(在搜索fgetcsv时,你会发现很多前辈们的苦战记录)。

如果使用流过滤器进行等效的多字节转换,并在fgetcsv()函数中读取,就可以从这样的艰斗中带来福音。

    • 巨大なSJISのCSVファイルをfgetcsv関数で処理する

 

    Stream_Filter_Mbstringを使えばCSVの処理が捗る!!

实施工作在下方所列示。

    • Stream_Filter_Mbstring@Openpear

 

    xcezx/Stream_Filter_Mbstring@GitHub

这次我从GitHub下载了一个zip文件,并把src文件夹下的Stream目录直接放在了主文件夹下。我听说也可以用Composer安装,但似乎需要额外的步骤。

当加载使用fgetcsv来读取SJIS编码的CSV文件时。

尝试使用BulkWriteMongoDB提供了一种名为BulkWrite的功能,可用于对集合进行批量的INSERT/UPDATE/DELETE/UPSERT操作。
本次我们将使用PHP7 + MongoDB driver + mongodb/mongo-php-library来尝试使用这个BulkWrite。

这里介绍了在mongodb-php-library中使用BulkWrite的方法。

参考→批量写入

实际操作为了与直接插入和BulkWrite版本的PHP源代码以及所需的库相匹配,我们准备了composer.json文件。
这次我们使用了guiguiboy/PHP-CLI-Progress-Bar来显示进度。

在PHP的CLI脚本中,参考以下代码实现进度条:

{
    "mongodb/mongodb": "^1.0.0",
    "guiguiboy/php-cli-progress-bar": "dev-master"
}

一般来说,插入版本

<?php

require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';

stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');

$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();

sleep(1);

$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);


stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');

$time_start = microtime(true);
while ($columns = fgetcsv($fp, 4096)){
    $collection->insertOne([
        'name' => $columns[0],
        'kana' => $columns[1],
        'email' => $columns[2],
        'gender' => $columns[3],
        'age' => $columns[4],
        'birth' => $columns[5],
        'bride' => $columns[6],
        'blood' => $columns[7],
        'prefecture' => $columns[8],
        'tel1' => $columns[9],
        'tel2' => $columns[10],
        'carrier' => $columns[11],
        'curry' => $columns[12]
    ]);
    $bar->advance();
}
$time_end = microtime(true);
$time = $time_end - $time_start;

echo $time . " seconds\n";

批量写入版本

<?php

require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';

stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');

$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();

sleep(1);

$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);


stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');

$time_start = microtime(true);
$bulks = [];
while ($columns = fgetcsv($fp, 4096)){
    $bulks[] = [
        'insertOne' => [[
            'name' => $columns[0],
            'kana' => $columns[1],
            'email' => $columns[2],
            'gender' => $columns[3],
            'age' => $columns[4],
            'birth' => $columns[5],
            'bride' => $columns[6],
            'blood' => $columns[7],
            'prefecture' => $columns[8],
            'tel1' => $columns[9],
            'tel2' => $columns[10],
            'carrier' => $columns[11],
            'curry' => $columns[12]
        ]]
    ];
    // 1000件ずつ書き込んでおいたほうが安全らしい
    if(count($bulks) > 999){
        $collection->bulkWrite($bulks);
        $bulks = [];
    }
    $bar->advance();
}

// 残りのBulkwriteを処理する
if(count($bulks) > 0){
    $collection->bulkWrite($bulks);
}

$time_end = microtime(true);
$time = $time_end - $time_start;

echo $time . " seconds\n";

在具有相同规格的机器上运行了大约3次的结果(单位为秒)

通常INSERTBulk Write1回目40.4524.522回目41.4326.053回目40.0925.15在实际工作中,我曾经编写了一个处理巨大约20万行的CSV文件的日常批处理程序。原始CSV文件的处理方法不太理想(可能是因为一次性加载到内存中导致发生了交换),导致处理时间非常长。但是,当我改用流过滤器读取和批量写入BulkWrite时,速度显著提升。

bannerAds