1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86: 87: 88: 89: 90: 91: 92: 93: 94: 95: 96: 97: 98: 99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 185: 186: 187: 188: 189: 190: 191: 192: 193: 194: 195: 196: 197: 198: 199: 200: 201: 202: 203: 204: 205: 206: 207: 208: 209: 210: 211: 212: 213: 214: 215: 216: 217: 218: 219: 220: 221: 222: 223: 224: 225: 226: 227: 228: 229: 230: 231: 232: 233: 234: 235: 236: 237: 238: 239: 240: 241: 242: 243: 244: 245: 246: 247: 248: 249: 250: 251: 252: 253: 254: 255: 256: 257: 258: 259: 260: 261: 262: 263: 264: 265: 266: 267: 268: 269: 270: 271: 272: 273: 274: 275: 276: 277: 278: 279: 280: 281: 282: 283: 284: 285: 286: 287: 288: 289: 290: 291: 292: 293: 294: 295: 296: 297: 298: 299: 300: 301: 302: 303: 304: 305: 306: 307: 308: 309: 310: 311: 312: 313: 314: 315: 316: 317: 318: 319: 320: 321: 322: 323: 324: 325: 326: 327: 328: 329: 330: 331: 332: 333: 334: 335: 336: 337: 338: 339: 340: 341: 342: 343: 344: 345: 346: 347: 348: 349: 350: 351: 352: 353: 354: 355: 356: 357: 358: 359: 360: 361: 362: 363: 364: 365: 366: 367: 368: 369: 370: 371: 372: 373: 374: 375: 376: 377: 378: 379: 380: 381: 382: 383: 384: 385: 386: 387: 388: 389: 390: 391: 392: 393: 394: 395: 396: 397: 398: 399: 400: 401: 402: 403: 404: 405: 406: 407: 408: 409: 410: 411: 412: 413: 414: 415: 416: 417: 418: 419: 420: 421: 422: 423: 424: 425: 426: 427: 428: 429: 430: 431: 432: 433: 434: 435: 436: 437: 438: 439: 440: 441: 442: 443: 444: 445: 446: 447: 448: 449: 450: 451: 452: 453: 454: 455: 456: 457: 458: 459: 460: 461: 462: 463: 464: 465: 466: 467: 468: 469: 470: 471: 472: 473: 474: 475: 476: 477: 478: 479: 480: 481: 482: 483: 484: 485: 486: 487: 488: 489: 490: 491: 492: 493: 494: 495: 496: 497: 498: 499: 500: 501: 502: 503: 504: 505: 506: 507: 508: 509: 510: 511: 512: 513: 514: 515: 516: 517: 518: 519: 520: 521: 522: 523: 524: 525: 526: 527: 528: 529: 530: 531: 532: 533: 534: 535: 536: 537: 538: 539: 540: 541: 542: 543: 544: 545: 546: 547: 548: 549: 550: 551: 552: 553: 554: 555: 556: 557: 558: 559: 560: 561: 562: 563: 564: 565: 566: 567: 568: 569: 570: 571: 572: 573: 574: 575: 576: 577: 578: 579: 580: 581: 582: 583: 584: 585: 586: 587: 588: 589: 590: 591: 592: 593: 594: 595: 596: 597: 598: 599: 600: 601: 602: 603: 604: 605: 606: 607: 608: 609: 610: 611: 612: 613: 614: 615: 616: 617: 618: 619: 620: 621: 622: 623: 624: 625: 626: 627: 628: 629: 630: 631: 632: 633: 634: 635: 636: 637: 638: 639: 640: 641: 642:
<?php
namespace Kanryu\QuickCsv;
class QuickCsvImporter
{
public $pdo = null;
public $query_callback = null;
public $fieldSchema = array();
public $fieldMap = array();
public $dataTableName = 'tempCsvData';
public $destTableName = 'TARGET_TABLE';
public $destPrimaryKey = 'PRIMARY_KEY';
public $csvSeparator = "','";
public $csvEncloser = "'\"'";
public $csvLineStart = "''";
public $csvLineSep = "'\\r\\n'";
public $csvRecordId = 'id';
public $hasCsvHeader = true;
public $asTemporary = true;
public $dumpSql = false;
public $tableCharCode = 'utf8';
public $csvCharCode = 'cp932';
public $validators = array();
public $validatorTypes = array();
public function __construct($options = array())
{
$this->query_callback = function ($sql, $params, $api) {
throw new Exception('must be implemented');
};
$this->setValidator('maxlength', function(&$v, $m, $name) {$v["{$name}_maxlength"] = "CHAR_LENGTH({$name}) > {$m->maxlength}";});
$this->setValidator('required', function(&$v, $m, $name) {$v["{$name}_required"] = "{$name} = ''";});
$this->setValidator('custom', function(&$v, $m, $name) {
$v["{$name}_custom"] = isset($m->required)
? "NOT {$m['custom']}"
: "({$name} != '' AND NOT ({$m->custom}))";
});
$this->setValidator('type', function(&$v, $m, $name) {
$empty_check = $this->getNotDefaultFormula($name);
$type = $m->type;
if (array_key_exists($type, $this->validatorTypes)) {
$this->validatorTypes[$type]($v, $m, $name, $empty_check);
} else {
$v["{$name}_notdecimal"] = "({$empty_check} AND {$name} = 0 AND {$name} != '0') OR CAST({$name} AS {$type}) != {$name}";
}
});
$this->setValidatorForType('varchar', function(&$v, $m, $name, $empty_check) {});
$this->setValidatorForType('datetime', function(&$v, $m, $name, $empty_check) {$v["{$name}_not{$m->type}"] = "{$empty_check} AND {$name} != '' AND DAYOFYEAR({$name}) = 0";});
$this->setValidatorForType('date', $this->validatorTypes['datetime']);
$this->setValidatorForType('alphanumeric', function(&$v, $m, $name, $empty_check) {$v["{$name}_not{$m->type}"] = "({$empty_check} AND {$name} != '' AND NOT {$name} REGEXP '^[a-zA-Z0-9\-]+$')";});
$this->setProperties($options);
}
public function setProperties($options = array())
{
foreach ($options as $p => $v) {
if ($p == 'fieldSchema') {
$this->setFieldSchema($v);
} elseif ($p == 'pdo') {
$this->setPdo($v);
} else {
$this->$p = $v;
}
}
}
public function setValidator($name, $func)
{
$this->validators[$name] = $func;
}
public function setValidatorForType($name, $func)
{
$this->validatorTypes[$name] = $func;
}
public function setFieldSchema($schema)
{
$this->fieldSchema = $schema;
$this->fieldMap = array();
foreach ($schema as $i => $v) {
$v = is_array($v) ? (object)$v : $v;
$name = isset($v->name) ? $v->name : $i;
$this->fieldMap[$name] = $v;
}
}
public function setPdo($pdo)
{
$this->pdo = $pdo;
$that = $this;
$this->query_callback = function ($sql, $params, $api) use($that) {
if ($that->dumpSql) {
echo "----------- {$api}:\n";
echo "$sql\n";
}
$stmt = $this->pdo->prepare($sql);
$result = $stmt->execute($params);
switch($api) {
case 'validateAllFields':
return $stmt->fetchAll(\PDO::FETCH_ASSOC);
break;
default:
return $result;
}
};
}
public function setQueryCallback($query_callback)
{
$this->query_callback = $query_callback;
}
public function execQuery($sql, $params, $api)
{
return call_user_func_array($this->query_callback, func_get_args());
}
public function create($dataTableName = null, $schema = null)
{
if (!empty($dataTableName)) {
$this->dataTableName = $dataTableName;
}
if (!empty($fields)) {
$this->setFieldSchema($fields);
}
if (empty($this->dataTableName)) {
throw new \Exception("[QuickCsv]: You must define the table to import into!");
}
if (empty($this->fieldSchema)) {
throw new \Exception("[QuickCsv]: You must define the CSV schema!");
}
$this->execQuery("DROP TABLE IF EXISTS {$this->dataTableName}", null, 'create:drop');
$schemas = array();
foreach ($this->fieldMap as $f => $v) {
$length = $v->maxlength + 1;
$type = $length > 255 ? 'TEXT' : "VARCHAR({$length})";
$schema_default = 'DEFAULT ' . (isset($v->default) ? $v->default : "''");
$schemas[] = isset($v->field) ? $v->field : "`{$f}` {$type} {$schema_default}";
}
$field_lines = implode(",\n ", $schemas);
$temporary = $this->asTemporary ? 'TEMPORARY' : '';
$sql = "
CREATE {$temporary} TABLE {$this->dataTableName}
(
`{$this->csvRecordId}` INT(9) NOT NULL AUTO_INCREMENT,
{$field_lines},
PRIMARY KEY (`{$this->csvRecordId}`)
) ENGINE=MyISAM DEFAULT CHARSET={$this->tableCharCode};
";
return $this->execQuery($sql, null, 'create');
}
public function import($path, $dataTableName = null, $schema = null) {
if (!empty($dataTableName)) {
$this->dataTableName = $dataTableName;
}
if (!empty($schema)) {
$this->setFieldSchema($schema);
}
if (empty($this->dataTableName)) {
throw new \Exception("[QuickCsv]: You must define the table to import into!");
}
if (empty($this->fieldSchema)) {
throw new \Exception("[QuickCsv]: You must define the CSV schema!");
}
$names = array();
$setters = array();
foreach ($this->fieldMap as $field => $v) {
if (!isset($v->default)) {
$names[] = $field;
continue;
}
$names[] = "@var_{$field}";
$setters[] = "{$field} = CASE @var_{$field} WHEN '' THEN {$v->default} ELSE @var_{$field} END";
}
$settter_lines = empty($setters) ? '' : 'SET ' . implode(",\n ", $setters);
$ignoreLine = $this->hasCsvHeader ? 'IGNORE 1 LINES' : '';
$field_lines = implode(",", $names);
$sql = <<<SQL
LOAD DATA LOCAL INFILE "{$path}"
INTO TABLE {$this->dataTableName}
CHARACTER SET {$this->csvCharCode}
FIELDS TERMINATED BY {$this->csvSeparator}
OPTIONALLY ENCLOSED BY {$this->csvEncloser}
LINES TERMINATED BY {$this->csvLineSep} STARTING BY {$this->csvLineStart}
{$ignoreLine}
({$field_lines})
{$settter_lines}
SQL;
return $this->execQuery($sql, null, 'import');
}
public function validateAllFields()
{
if (empty($this->fieldSchema)) {
throw new \Exception("[QuickCsv]: You must define the CSV schema!");
}
$fields = array();
foreach ($this->fieldMap as $name => $m) {
foreach ($this->validators as $prop => $vf) {
if (array_key_exists($prop, $m)) {
$vf($fields, $m, $name);
}
}
}
return $this->validateBase('validateAllFields', $fields);
}
public function validateBase($api, $fields, $additional_tables='')
{
$field_formula = array();
$conditions = array();
foreach ($fields as $f => $v) {
$flds[] = "{$v} AS {$f}";
$field_formula[] = $v;
}
$field_lines = implode(",\n ", $flds);
$condition_lines = implode("\n OR ", $field_formula);
$sql = "
SELECT
{$this->csvRecordId},
{$field_lines}
FROM
{$this->dataTableName}
WHERE
{$condition_lines}
{$additional_tables}
ORDER BY {$this->csvRecordId}
";
return $this->execQuery($sql, null, $api);
}
public function validateNonExistForeignKey($field, $foreignKey, $foreignTableName, $condition = '')
{
$sql = "
SELECT
{$this->csvRecordId}, {$field}
FROM
{$this->dataTableName}
WHERE
{$field} NOT IN
(
SELECT
{$foreignKey}
FROM
{$foreignTableName}
WHERE
1 = 1
AND {$condition}
)
ORDER BY {$this->csvRecordId}
";
return $this->execQuery($sql, null, 'validateNonExistForeignKey');
}
public function validateDuplicatedId($field)
{
if (!is_array($field)) {
$field = array($field);
}
$field_not_defaults = array();
$t1_fields = array();
$t1t2_on = array();
foreach ($field as $f) {
$field_not_defaults[] = $this->getNotDefaultFormula($f);
$t1_fields[] = "t1.{$f}";
$t1t2_on[] = "t1.{$f} = t2.{$f}";
}
$condition_field_not_defaults = implode(" AND\n ", $field_not_defaults);
$t1_fields_line = implode(", ", $t1_fields);
$fields_line = implode(", ", $field);
$t1t2_on_line = implode(" AND ", $t1t2_on);
$sql = "
SELECT t1.{$this->csvRecordId}, {$t1_fields_line}
FROM {$this->dataTableName} t1
INNER JOIN
(
SELECT {$fields_line}, COUNT(*) as ___count
FROM {$this->dataTableName}
WHERE {$condition_field_not_defaults}
GROUP BY {$fields_line}
ORDER BY NULL
) t2
ON
{$t1t2_on_line}
WHERE t2.___count > 1
ORDER BY t1.{$this->csvRecordId}
";
return $this->execQuery($sql, null, 'validateDuplicatedId');
}
public function getNotDefaultFormula($field)
{
if (empty($this->fieldSchema)) {
throw new \Exception("[QuickCsv]: You must define the CSV schema!");
}
if (!array_key_exists($field, $this->fieldMap)) {
throw new \Exception("[QuickCsv]: field must exist in the fieldSchema!");
}
$v = $this->fieldMap[$field];
if (isset($v->required)) {
$empty_check = "1=1";
} elseif (isset($v->default) && $v->default == 'NULL' ) {
$empty_check = "{$field} IS NOT NULL";
} elseif (isset($v->default)) {
$empty_check = "{$field} != {$v->default}";
} else {
$empty_check = "{$field} != ''";
}
return $empty_check;
}
public function updateFieldNumberByAutoCount($field, $baseNumber=0, $destTableName=null)
{
if (!empty($destTableName)) {
$this->destTableName = $destTableName;
}
$sql = "
UPDATE {$this->dataTableName} t10,
(
SELECT t2.{$this->csvRecordId}, t2.{$this->csvRecordId} + t1.{$field} AS {$field}
FROM {$this->dataTableName} t2,
(
SELECT MAX({$field}) AS {$field}
FROM (
SELECT {$baseNumber} AS {$field}
UNION
SELECT MAX(CAST(productId AS UNSIGNED)) AS {$field}
FROM {$this->dataTableName}
UNION
SELECT MAX(productId) AS {$field}
FROM {$this->destTableName}
) t0
) t1
) t20
SET
t10.{$field} = t20.{$field}
WHERE
t10.{$field} = 0
AND t10.{$this->csvRecordId} = t20.{$this->csvRecordId}
";
return $this->execQuery($sql, null, 'updateFieldNumberByAutoCount');
}
public function updateFieldNumberByAutoCountWithPrefix($field, $prefix, $body='#', $baseNumber=0, $destTableName=null)
{
if (!empty($destTableName)) {
$this->destTableName = $destTableName;
}
if (empty($body) || $body == '#') {
$newValue = "CONCAT({$prefix}, t2.{$this->csvRecordId} + IFNULL(t1.{$field}, 0))";
} else {
$bodylen = strlen($body);
$newValue = "CONCAT({$prefix}, LPAD(t2.{$this->csvRecordId} + IFNULL(t1.{$field}, 0), {$bodylen}, '0'))";
}
$prefixlen = strlen($prefix);
$sql = "
UPDATE {$this->dataTableName} t10,
(
SELECT t2.{$this->csvRecordId}, {$newValue} AS {$field}
FROM {$this->dataTableName} t2,
(
SELECT MAX({$field}) AS {$field}
FROM (
SELECT {$baseNumber} AS {$field}
UNION
SELECT MAX(CAST(SUBSTRING(productCode, {$prefixlen}) AS UNSIGNED)) AS {$field}
FROM {$this->dataTableName}
WHERE
CHAR_LENGTH(productCode) > {$prefixlen}
UNION
SELECT MAX(CAST(SUBSTRING(productCode, {$prefixlen}) AS UNSIGNED)) AS {$field}
FROM Product
WHERE
CHAR_LENGTH(productCode) > {$prefixlen}
) t0
) t1
) t20
SET
t10.productCode = t20.{$field}
WHERE
t10.{$field} = 0
AND t10.{$this->csvRecordId} = t20.{$this->csvRecordId}
";
return $this->execQuery($sql, null, 'updateFieldNumberByAutoCountWithPrefix');
}
public function updateExistingRecords($immediates = array())
{
$schemas = array();
$params = array();
foreach ($this->fieldMap as $f => $v) {
if (isset($v->skipped) && $v->skipped) {
continue;
}
if (array_key_exists($f, $immediates)) {
$schemas[] = "t1.{$f} = :{$f}";
$params[":{$f}"] = $immediates[$f];
unset($immediates[$f]);
} else {
$schemas[] = "t1.{$f} = t2.{$f}";
}
}
foreach ($immediates as $f => $v) {
$schemas[] = "t1.{$f} = :{$f}";
$params[":{$f}"] = $v;
}
$field_lines = implode(",\n ", $schemas);
$condition = "t1.{$this->destPrimaryKey} = t2.{$this->destPrimaryKey}";
if (is_array($this->destPrimaryKey)) {
$conditions = array();
foreach ($this->destPrimaryKey as $f) {
$conditions[] = "t1.{$f} = t2.{$f}";
}
$condition = implode(" AND\n ", $conditions);
}
$sql = "
UPDATE
{$this->destTableName} t1,
{$this->dataTableName} t2
SET
{$field_lines}
WHERE
{$condition}
";
return $this->execQuery($sql, $params, 'updateExistingRecords');
}
public function insertNonExistingRecords($immediates = array())
{
$names1 = array();
$names2 = array();
$schemas = array();
$params = array();
foreach ($this->fieldMap as $f => $v) {
if (isset($v->skip) && $v->skip) {
continue;
}
$names1[] = $f;
if (array_key_exists($f, $immediates)) {
$names2[] = ":{$f}";
$params[":{$f}"] = $immediates[$f];
unset($immediates[$f]);
} else {
$names2[] = "t2.{$f}";
}
}
foreach ($immediates as $f => $v) {
$names1[] = $f;
$names2[] = ":{$f}";
$params[":{$f}"] = $v;
}
$field_lines1 = implode(",", $names1);
$field_lines2 = implode(",", $names2);
$condition = "t1.{$this->destPrimaryKey} = t2.{$this->destPrimaryKey}";
$condition2 = "t1.{$this->destPrimaryKey} IS NULL";
if (is_array($this->destPrimaryKey)) {
$conditions = array();
$conditions2 = array();
foreach ($this->destPrimaryKey as $f) {
$conditions[] = "t1.{$f} = t2.{$f}";
$conditions2[] = "t1.{$f} IS NULL";
}
$condition = implode(" AND\n ", $conditions);
$condition2 = implode(" AND\n ", $conditions2);
}
$sql = "
INSERT INTO {$this->destTableName}
({$field_lines1})
SELECT
{$field_lines2}
FROM {$this->dataTableName} t2
LEFT JOIN {$this->destTableName} t1
ON {$condition}
WHERE
{$condition2}
ORDER BY t2.{$this->csvRecordId}
";
return $this->execQuery($sql, $params, 'insertNonExistingRecords');
}
}