将大量数据从CSV文件导入到MongoDB
首先开发商务应用程序时,从CSV文件中将数据输入或更新到数据库中是经常遇到的需求。最大的好处莫过于可以使用大家都喜欢的Microsoft Excel进行编辑了吧。
在MongoDB的情况下,如果数据无需加工,可以使用mongoimport来部分满足需求。但通常从Excel导出的CSV文件需要考虑字符编码为Shift JIS,还需要处理日期和时间数据以及将其转换为特定的应用程序数据等,因此编写相应的程序来完成这些转换是很常见的情况。
请用中文陈述以下内容(仅需一个选项):本次话题
我将尝试制作一个脚本,将由伪造个人信息的样本数据生成的5000条包含所有字段的CSV数据重复生成6次,并将总共30000条数据导入MongoDB数据库。
在读取的同时转换CSV文件CSV文件的字符编码为Shift JIS,而数据库使用的是UTF-8,这是一个常见的情况。此外,CSV文件中混合了半角假名和全角英数字,这也是常见于非开发部门提供的文件的特点。
用PHP时,可以使用mb_convert_encoding()函数来转换字符编码,使用mb_convert_kana()进行全角半角的统一转换。但是,如果文件很大,我们尽量避免一次性将整个文件内容读入内存并进行转换。
逐行读取并即时转换…这个过程会变得非常繁琐,尤其是还要处理引号等等。
在PHP中有一个名为fgetcsv()的函数,非常适合用于基于行的顺序访问处理CSV数据。但在多字节环境中,它就像是一片雷区一样(在搜索fgetcsv时,你会发现很多前辈们的苦战记录)。
如果使用流过滤器进行等效的多字节转换,并在fgetcsv()函数中读取,就可以从这样的艰斗中带来福音。
-
- 巨大なSJISのCSVファイルをfgetcsv関数で処理する
- Stream_Filter_Mbstringを使えばCSVの処理が捗る!!
实施工作在下方所列示。
-
- Stream_Filter_Mbstring@Openpear
- xcezx/Stream_Filter_Mbstring@GitHub
这次我从GitHub下载了一个zip文件,并把src文件夹下的Stream目录直接放在了主文件夹下。我听说也可以用Composer安装,但似乎需要额外的步骤。
当加载使用fgetcsv来读取SJIS编码的CSV文件时。
尝试使用BulkWriteMongoDB提供了一种名为BulkWrite的功能,可用于对集合进行批量的INSERT/UPDATE/DELETE/UPSERT操作。
本次我们将使用PHP7 + MongoDB driver + mongodb/mongo-php-library来尝试使用这个BulkWrite。
这里介绍了在mongodb-php-library中使用BulkWrite的方法。
参考→批量写入
实际操作为了与直接插入和BulkWrite版本的PHP源代码以及所需的库相匹配,我们准备了composer.json文件。
这次我们使用了guiguiboy/PHP-CLI-Progress-Bar来显示进度。
在PHP的CLI脚本中,参考以下代码实现进度条:
{
"mongodb/mongodb": "^1.0.0",
"guiguiboy/php-cli-progress-bar": "dev-master"
}
一般来说,插入版本
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
while ($columns = fgetcsv($fp, 4096)){
$collection->insertOne([
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]);
$bar->advance();
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
批量写入版本
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
$bulks = [];
while ($columns = fgetcsv($fp, 4096)){
$bulks[] = [
'insertOne' => [[
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]]
];
// 1000件ずつ書き込んでおいたほうが安全らしい
if(count($bulks) > 999){
$collection->bulkWrite($bulks);
$bulks = [];
}
$bar->advance();
}
// 残りのBulkwriteを処理する
if(count($bulks) > 0){
$collection->bulkWrite($bulks);
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
在具有相同规格的机器上运行了大约3次的结果(单位为秒)
通常INSERTBulk Write1回目40.4524.522回目41.4326.053回目40.0925.15在实际工作中,我曾经编写了一个处理巨大约20万行的CSV文件的日常批处理程序。原始CSV文件的处理方法不太理想(可能是因为一次性加载到内存中导致发生了交换),导致处理时间非常长。但是,当我改用流过滤器读取和批量写入BulkWrite时,速度显著提升。
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
while ($columns = fgetcsv($fp, 4096)){
$collection->insertOne([
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]);
$bar->advance();
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
<?php
require_once dirname(__FILE__) . '/vendor/autoload.php';
require_once dirname(__FILE__) . '/Stream/Filter/Mbstring.php';
stream_filter_register('convert.mbstring.*','Stream_Filter_Mbstring');
$mongoClient = new MongoDB\Client('mongodb://localhost:27017');
$db = $mongoClient->selectDatabase('nanchatte');
$collection = $db->selectCollection('persons');
$collection->drop();
sleep(1);
$fp = fopen('nanchatte.csv', 'r');
for($lines=0; fgets($fp); $lines ++);
rewind($fp);
print "$lines lines.\n";
$bar = new \ProgressBar\Manager(0, $lines);
stream_filter_append($fp,'convert.mbstring.encoding.SJIS-win:UTF-8');
stream_filter_append($fp,'convert.mbstring.kana.KVas:UTF-8');
$time_start = microtime(true);
$bulks = [];
while ($columns = fgetcsv($fp, 4096)){
$bulks[] = [
'insertOne' => [[
'name' => $columns[0],
'kana' => $columns[1],
'email' => $columns[2],
'gender' => $columns[3],
'age' => $columns[4],
'birth' => $columns[5],
'bride' => $columns[6],
'blood' => $columns[7],
'prefecture' => $columns[8],
'tel1' => $columns[9],
'tel2' => $columns[10],
'carrier' => $columns[11],
'curry' => $columns[12]
]]
];
// 1000件ずつ書き込んでおいたほうが安全らしい
if(count($bulks) > 999){
$collection->bulkWrite($bulks);
$bulks = [];
}
$bar->advance();
}
// 残りのBulkwriteを処理する
if(count($bulks) > 0){
$collection->bulkWrite($bulks);
}
$time_end = microtime(true);
$time = $time_end - $time_start;
echo $time . " seconds\n";
在具有相同规格的机器上运行了大约3次的结果(单位为秒)
通常INSERTBulk Write1回目40.4524.522回目41.4326.053回目40.0925.15在实际工作中,我曾经编写了一个处理巨大约20万行的CSV文件的日常批处理程序。原始CSV文件的处理方法不太理想(可能是因为一次性加载到内存中导致发生了交换),导致处理时间非常长。但是,当我改用流过滤器读取和批量写入BulkWrite时,速度显著提升。
通常INSERTBulk Write1回目40.4524.522回目41.4326.053回目40.0925.15在实际工作中,我曾经编写了一个处理巨大约20万行的CSV文件的日常批处理程序。原始CSV文件的处理方法不太理想(可能是因为一次性加载到内存中导致发生了交换),导致处理时间非常长。但是,当我改用流过滤器读取和批量写入BulkWrite时,速度显著提升。