1 module unde.file_manager.copy_paths; 2 3 import unde.global_state; 4 import unde.lsblk; 5 import unde.lib; 6 import unde.scan; 7 import unde.path_mnt; 8 import unde.slash; 9 10 import std.stdio; 11 import std.conv; 12 import core.stdc.stdlib; 13 import std.math; 14 import berkeleydb.all; 15 import std.stdint; 16 import core.stdc.stdlib; 17 import std..string; 18 import std.algorithm.sorting; 19 import std.utf; 20 import std.process; 21 import std.concurrency; 22 import core.time; 23 import core.thread; 24 import std.datetime; 25 import std.regex; 26 import std.algorithm.sorting; 27 28 /*mkstemp, fchmod. strerror, errno, close*/ 29 import core.stdc..string; 30 import core.stdc.errno; 31 32 import derelict.sdl2.sdl; 33 import derelict.sdl2.ttf; 34 import derelict.sdl2.image; 35 36 import std.file; 37 import core.sys.posix.unistd; 38 import core.sys.posix.stdlib; 39 import core.sys.posix.sys.stat; 40 41 immutable DRect drect_zero = DRect(0, 0, 0, 0); 42 43 private void 44 save_errors(FMGlobalState cgs, PathMnt path, string error) 45 { 46 cgs.recommit(); 47 Dbt key, data; 48 string path0 = path.get_key(cgs.lsblk); 49 key = path0; 50 auto res = cgs.db_map.get(cgs.txn, &key, &data); 51 cgs.OIT++; 52 53 RectSize rectsize; 54 if (res == 0) 55 rectsize = data.to!(RectSize); 56 57 ulong curr_time = Clock.currTime().toUnixTime(); 58 rectsize.msg = to_char_array!80(error); 59 rectsize.msg_time = curr_time; 60 rectsize.msg_color = 0x80FF8080; // ARGB 61 62 data = rectsize; 63 //writefln("WRITE - %s - %s", path0, rectsize); 64 res = cgs.db_map.put(cgs.txn, &key, &data); 65 if (res != 0) 66 throw new Exception("Path info to map-db not written"); 67 cgs.OIT++; 68 cgs.recommit(); 69 70 ssize_t first, last; 71 first = path0.indexOf("\0"); 72 while (first != last) 73 { 74 last = path0.lastIndexOf("\0"); 75 if (last > first) 76 path0 = path0[0..last]; 77 else 78 path0 = path0[0..first+1]; 79 key = path0; 80 res = cgs.db_map.get(cgs.txn, &key, &data); 81 cgs.OIT++; 82 if (res == 0) 83 { 84 rectsize = data.to!(RectSize); 85 86 if (curr_time > rectsize.newest_msg_time) 87 { 88 rectsize.newest_msg_time = curr_time; 89 90 data = rectsize; 91 //writefln("WRITE - %s - %s", path0, rectsize); 92 res = cgs.db_map.put(cgs.txn, &key, &data); 93 if (res != 0) 94 throw new Exception("Path info to map-db not written"); 95 cgs.OIT++; 96 cgs.recommit(); 97 } 98 } 99 } 100 } 101 102 private RectSize 103 get_path_rectsize(FMGlobalState cgs, PathMnt path) 104 { 105 string path0 = path.get_key(cgs.lsblk); 106 path0 = path0[0..$-1]; 107 Dbt key = path0; 108 Dbt data; 109 RectSize rectsize; 110 auto res = cgs.db_map.get(cgs.txn, &key, &data); 111 cgs.OIT++; 112 if (res == 0) 113 { 114 rectsize = data.to!(RectSize); 115 } 116 return rectsize; 117 } 118 119 package void 120 update_progress(FMGlobalState cgs, PathMnt copy_to_mnt, 121 string progress_path, 122 long estimate_end, 123 int progress // from 0 tlll 10000 124 ) 125 { 126 cgs.recommit(); 127 string copy_to0 = copy_to_mnt.get_key(cgs.lsblk); 128 129 Dbt key; 130 Dbt data; 131 copy_to0 = copy_to0[0..$-1]; 132 133 key = copy_to0; 134 //writefln("Get %s", copy_to0.replace("\0", SL)); 135 int res = cgs.db_map.get(cgs.txn, &key, &data); 136 cgs.OIT++; 137 if (res == 0) 138 { 139 RectSize rectsize; 140 rectsize = data.to!(RectSize); 141 142 if (progress >= 10000) 143 { 144 rectsize.path[0..$] = char.init; 145 rectsize.estimate_end = 0; 146 rectsize.progress = 0; 147 rectsize.show_info = InfoType.None; 148 } 149 else 150 { 151 rectsize.path = to_char_array!80(progress_path); 152 rectsize.estimate_end = estimate_end; 153 rectsize.progress = progress; 154 rectsize.show_info = InfoType.Progress; 155 } 156 157 //writefln("Put progress %s", progress); 158 data = rectsize; 159 res = cgs.db_map.put(cgs.txn, &key, &data); 160 if (res != 0) 161 throw new Exception("Path info to map-db not written"); 162 cgs.OIT++; 163 cgs.recommit(); 164 } 165 } 166 167 version (Windows) 168 { 169 private string 170 windows_path_to_cygwin(FMGlobalState cgs, string path) 171 { 172 if (path[1] == ':') 173 { 174 path = "\\cygdrive\\" ~ toLower(path[0..1]) ~ path[2..$]; 175 } 176 return path.replace(SL, "/"); 177 } 178 179 private string 180 cygwin_path_to_windows(FMGlobalState cgs, string path) 181 { 182 if (path.startsWith("/cygdrive/")) 183 { 184 path = toUpper(path[10..11]) ~ ":" ~ path[11..$]; 185 } 186 return path.replace("/", SL); 187 } 188 189 unittest 190 { 191 string win_path = "C:\\TEST\\"; 192 string cygwin_path = "/cygdrive/c/TEST/"; 193 194 string cygwin_path2 = windows_path_to_cygwin(null, win_path); 195 string win_path2 = cygwin_path_to_windows(null, cygwin_path); 196 assert(win_path == win_path2, win_path ~ " == " ~ win_path2); 197 assert(cygwin_path == cygwin_path2, cygwin_path ~ " == " ~ cygwin_path2); 198 } 199 } 200 201 package int 202 copy_path(FMGlobalState cgs, PathMnt path, string copy_to, bool remove_flag, Tid tid) 203 { 204 cgs.recommit(); 205 int num_errors; 206 string archieve_option = "-a"; 207 208 version(Posix) 209 { 210 string to_directory = getParent(copy_to); 211 char[] temp_template = (to_directory ~ "/check_chmod_XXXXXXX").dup(); 212 int fd = mkstemp(cast(char*)temp_template); 213 if (fd >= 0) 214 { 215 string temp_file = temp_template.idup(); 216 int res = fchmod(fd, std.conv.octal!666); 217 if (res != 0) 218 { 219 archieve_option = "-rltgoD"; 220 } 221 close(fd); 222 remove(temp_file); 223 } 224 else 225 { 226 string error = fromStringz(strerror(errno)).idup(); 227 writefln(error); 228 } 229 } 230 231 string path_for_rsync = path; 232 string copy_to_for_rsync = copy_to; 233 version (Windows) 234 { 235 path_for_rsync = windows_path_to_cygwin(cgs, path_for_rsync); 236 copy_to_for_rsync = windows_path_to_cygwin(cgs, copy_to_for_rsync); 237 } 238 239 string[] rsync_args; 240 /* I very want to use -S (--sparse) option here, but it not compatible with --inplace 241 but inplace necessary for progress algorithm */ 242 if (remove_flag) 243 rsync_args = ["rsync", archieve_option, "-vuH", "--inplace", "--delete", path_for_rsync, copy_to_for_rsync]; 244 else 245 rsync_args = ["rsync", archieve_option, "-vuH", "--inplace", path_for_rsync, copy_to_for_rsync]; 246 auto rsync_pipes = pipeProcess(rsync_args, Redirect.stdout | Redirect.stderrToStdout); 247 scope(exit) wait(rsync_pipes.pid); 248 249 if (copy_to[$-1] == SL[0]) 250 { 251 string name = path.path[path.path.lastIndexOf(SL)+1..$]; 252 copy_to ~= name ~ SL; 253 } 254 else copy_to ~= SL; 255 256 PathMnt copy_to_mnt = PathMnt(cgs.lsblk, copy_to); 257 258 if (path._next[$-1] != SL[0]) 259 path._next ~= SL; 260 261 RectSize goal_rectsize = get_path_rectsize(cgs, path); 262 RectSize current_rectsize = get_path_rectsize(cgs, copy_to_mnt); 263 264 writefln("Goal %s", goal_rectsize.disk_usage); 265 writefln("Current %s", current_rectsize.disk_usage); 266 267 long disk_usage_at_start = current_rectsize.disk_usage; 268 long last_measure = Clock.currTime().stdTime; 269 bool first_measure = true; 270 long disk_usage_at_last_measure = current_rectsize.disk_usage; 271 long[] measurements; 272 long estimate_end; 273 274 cgs.commit(); 275 string path_from = path.path; 276 if (path_from[$-1] == SL[0]) path_from = path_from[0..$-1]; 277 send(tid, thisTid, path_from, copy_to[0..$-1], false, MsgState.Send); 278 MsgState resent; 279 //writefln("%s waits resent", thisTid); 280 do 281 { 282 receive( (MsgState msg) { resent = msg; } ); 283 } while (resent != MsgState.Resent); 284 //writefln("%s resent got", thisTid); 285 cgs.recommit(); 286 287 foreach (rsync_line; rsync_pipes.stdout.byLine) 288 { 289 cgs.recommit(); 290 //writefln("rsync_line: %s", rsync_line); 291 // exits on exits of parent 292 receiveTimeout( 0.seconds, 293 (OwnerTerminated ot) { 294 writefln("Abort copying due stopping parent"); 295 cgs.finish = true; 296 } ); 297 if (cgs.finish) break; 298 299 auto match = matchFirst(rsync_line, regex(`rsync: ([^()"]*)\(?"?(.*?)"?\)? failed: (.*)`)); 300 if (!match) 301 match = matchFirst(rsync_line, regex(`([^()"]*) failed to [^"]* "(.*?)": (.*)`)); 302 if (match) 303 { 304 string operation = match[1].idup(); 305 string err_path = match[2].idup(); 306 string error = match[3].idup(); 307 while (operation[$-1] == ' ') operation = operation[0..$-1]; 308 if (err_path[0] != SL[0]) 309 err_path = copy_to ~ err_path; 310 311 string name = ""; 312 while(!exists(err_path)) 313 { 314 writefln("copy_path: %s doesn't exists", err_path); 315 name = err_path[err_path.lastIndexOf(SL)+1..$]~SL~name; 316 err_path = getParent(err_path); 317 if (err_path == "") err_path = SL; 318 } 319 320 PathMnt err_pathmnt = PathMnt(cgs.lsblk, err_path); 321 322 num_errors++; 323 324 save_errors(cgs, err_pathmnt, operation ~": "~error); 325 //writefln("[%s, %s, %s]", operation, err_path, error); 326 } 327 else 328 { 329 match = matchFirst(rsync_line, regex(`^(\*\*\*.*\*\*\*|sending incremental file list||sent .*|total .*|.*some files/attrs were not transferred.*)$`)); 330 if (!match) 331 { 332 match = matchFirst(rsync_line, regex(`^deleting (.*)$`)); 333 if (match) 334 { 335 string del_path = match[1].idup(); 336 version(Windows) 337 { 338 del_path = cygwin_path_to_windows(cgs, del_path); 339 } 340 if (del_path[0] != SL[0]) 341 del_path = copy_to ~ del_path; 342 343 PathMnt del_path_mnt = PathMnt(cgs.lsblk, del_path); 344 345 if (del_path_mnt._next[$-1] != SL[0]) 346 del_path_mnt._next ~= SL; 347 348 RectSize del_rectsize = get_path_rectsize(cgs, del_path_mnt); 349 350 current_rectsize.disk_usage -= del_rectsize.disk_usage; 351 352 writefln("del path %s, usage %s", del_path, del_rectsize.disk_usage); 353 } 354 else 355 { 356 string line = rsync_line.idup(); 357 match = matchFirst(rsync_line, regex(`^(.*) -> (.*)$`)); 358 if (match) 359 { 360 line = match[1].idup(); 361 } 362 else 363 { 364 match = matchFirst(rsync_line, regex(`^(.*) => (.*)$`)); 365 if (match) 366 { 367 line = match[2].idup(); 368 } 369 else 370 { 371 match = matchFirst(rsync_line, regex(`^created directory (.*)$`)); 372 if (match) 373 { 374 line = match[1].idup(); 375 continue; 376 } 377 else 378 { 379 match = matchFirst(rsync_line, regex(`^skipping non-regular file`)); 380 if (match) 381 { 382 continue; 383 } 384 } 385 } 386 } 387 388 version(Windows) 389 { 390 line = cygwin_path_to_windows(cgs, line); 391 } 392 393 string path_wo_slash = path[0..$-1]; 394 string cur_path = path[0..path_wo_slash.lastIndexOf(SL)+1] ~ line; 395 string copy_to_wo_slash = copy_to[0..$-1]; 396 string copy_to_path = copy_to[0..copy_to_wo_slash.lastIndexOf(SL)+1] ~ line; 397 398 if (path.path == cur_path) 399 { 400 copy_to_path = copy_to[0..$-1]; 401 } 402 403 if (path.path[$-1] == SL[0]) 404 { 405 cur_path = path ~ line; 406 copy_to_path = copy_to ~ line; 407 } 408 409 PathMnt cur_path_mnt = PathMnt(cgs.lsblk, cur_path); 410 411 if (cur_path_mnt._next[$-1] != SL[0]) 412 cur_path_mnt._next ~= SL; 413 414 //writefln("%s, %s", cur_path_mnt._next, copy_to_path); 415 RectSize cur_rectsize = get_path_rectsize(cgs, cur_path_mnt); 416 417 long current_disk_usage = current_rectsize.disk_usage; 418 419 //writefln("path %s, size %s", line, 420 // cur_rectsize.disk_usage); 421 422 long disk_usage; 423 int error = 0; 424 int tries = 5; 425 do{ 426 error = 0; 427 try{ 428 auto de = DirEntry(copy_to_path); 429 if (!de.isSymlink() && !de.isDir()) 430 { 431 long last_disk_usage = 0; 432 int the_same_disk_usage_times = 0; 433 do 434 { 435 long stdtime = Clock.currTime().stdTime; 436 long unixtime = Clock.currTime().toUnixTime(); 437 438 version(Posix) 439 { 440 de = DirEntry(copy_to_path); 441 disk_usage = de.statBuf.st_blocks*512; 442 } 443 else version(Windows) 444 { 445 disk_usage = getFileSizeOnDisk(copy_to_path); 446 } 447 if (disk_usage == last_disk_usage) 448 { 449 the_same_disk_usage_times++; 450 int number_tries = 5; 451 if (path.path == cur_path) 452 number_tries = 500; 453 if (the_same_disk_usage_times > number_tries) 454 { 455 writefln("FILE DOESN'T GROW %s (current=%s), %s", 456 cur_rectsize.disk_usage, disk_usage, copy_to_path); 457 break; 458 } 459 } 460 else 461 { 462 the_same_disk_usage_times = 0; 463 } 464 last_disk_usage = disk_usage; 465 466 current_rectsize.disk_usage = current_disk_usage+disk_usage; 467 468 if (stdtime > (last_measure+10_000_000) || first_measure) 469 { 470 if (first_measure) 471 { 472 last_measure = stdtime; 473 first_measure = true; 474 } 475 last_measure += 10_000_000; 476 measurements = measurements ~ (current_rectsize.disk_usage - disk_usage_at_last_measure); 477 478 size_t len = measurements.length; 479 if (len > 20) 480 { 481 measurements = measurements[1..$]; 482 } 483 484 //writefln("measurements=%s", measurements); 485 486 auto sorted = measurements[0..$]; 487 sort!("a < b")(sorted); 488 489 long min = sorted[len/5]; 490 long max = sorted[len*4/5]; 491 long avg = sorted[len/2]; 492 //writefln("min=%s, max=%s, avg=%s", min, max, avg); 493 494 long max_estimate_end = long.max; 495 if (min > 0) max_estimate_end = unixtime + (goal_rectsize.disk_usage - current_rectsize.disk_usage)/min; 496 long min_estimate_end = long.max; 497 if (max > 0) min_estimate_end = unixtime + (goal_rectsize.disk_usage - current_rectsize.disk_usage)/max; 498 499 //writefln("min_estimate_end=%s, max_estimate_end=%s, estimate_end=%s", min_estimate_end, max_estimate_end, estimate_end); 500 long maxmin = max_estimate_end-min_estimate_end; 501 //min_estimate_end -= maxmin/2; 502 //max_estimate_end += maxmin/2; 503 if (min_estimate_end < unixtime) min_estimate_end = unixtime; 504 505 //writefln("Extended unixtime=%s, min_estimate_end=%s, max_estimate_end=%s, estimate_end=%s", unixtime, min_estimate_end, max_estimate_end, estimate_end); 506 507 if (estimate_end > max_estimate_end || estimate_end < min_estimate_end) 508 { 509 estimate_end = long.max; 510 if (avg > 0) estimate_end = unixtime + (goal_rectsize.disk_usage - current_rectsize.disk_usage)/avg; 511 //writefln("new estimate_end=%s", estimate_end); 512 } 513 514 int progress = 0; 515 if ((goal_rectsize.disk_usage - disk_usage_at_start) > 0) 516 progress = cast(int)(10000*(current_rectsize.disk_usage - disk_usage_at_start)/(goal_rectsize.disk_usage - disk_usage_at_start)); 517 if (progress >= 10000) 518 progress = 9999; 519 update_progress(cgs, copy_to_mnt, 520 rsync_line.idup(), 521 estimate_end, 522 progress // from 0 tlll 10000 523 ); 524 525 disk_usage_at_last_measure = current_rectsize.disk_usage; 526 } 527 528 if (disk_usage < cur_rectsize.disk_usage) 529 { 530 //writefln("%s < %s", disk_usage, cur_rectsize.disk_usage); 531 cgs.commit(); 532 cgs.recommit(); 533 Thread.sleep( dur!("msecs")( 200 ) ); 534 } 535 } 536 while (disk_usage < cur_rectsize.disk_usage); 537 538 current_rectsize.disk_usage = current_disk_usage + cur_rectsize.disk_usage; 539 } 540 else if (!de.isSymlink()) 541 { 542 version(Posix) 543 { 544 de = DirEntry(cur_path); 545 disk_usage = de.statBuf.st_blocks*512; 546 } 547 else version(Windows) 548 { 549 disk_usage = getFileSizeOnDisk(cur_path); 550 } 551 current_rectsize.disk_usage = current_disk_usage + disk_usage; 552 /*writefln("Dir or Symlink. path %s, size %s", rsync_line, 553 disk_usage);*/ 554 } 555 } 556 catch (Exception e) 557 { 558 error = 1; 559 } 560 tries--; 561 if (error) 562 { 563 Thread.sleep( dur!("msecs")( 200 ) ); 564 } 565 } 566 while (error && tries > 0); 567 568 if (tries == 0) 569 { 570 writefln("Can't open path %s - %s, size %s", cur_path, copy_to_path, 571 cur_rectsize.disk_usage); 572 } 573 574 /*writefln("path %s, size %s, current %s", rsync_line, 575 cur_rectsize.disk_usage, disk_usage);*/ 576 577 /*writefln("current_rectsize.disk_usage=%s, goal_rectsize.disk_usage=%s", 578 current_rectsize.disk_usage, 579 goal_rectsize.disk_usage);*/ 580 } 581 } 582 } 583 } 584 585 update_progress(cgs, copy_to_mnt, 586 "", 587 Clock.currTime().toUnixTime(), 588 10000); 589 590 return num_errors; 591 } 592 593 private void 594 start_copy_paths(shared LsblkInfo[string] lsblk, immutable string[] paths, string copy_to, bool remove, Tid tid) 595 { 596 writefln("Start copying %s, remove=%s", paths, remove); 597 598 try { 599 FMGlobalState cgs = new FMGlobalState(); 600 scope(exit) 601 { 602 destroy(cgs); 603 } 604 605 cgs.lsblk = to!(LsblkInfo[string])(lsblk); 606 607 string[] paths_dup = paths.dup; 608 sort!("a < b")(paths_dup); 609 foreach(path; paths_dup) 610 { 611 copy_path(cgs, PathMnt(cgs.lsblk, path), copy_to, remove, tid); 612 cgs.commit(); 613 } 614 } catch (shared(Throwable) exc) { 615 send(tid, exc); 616 } 617 618 writefln("Finish copying %s", paths); 619 send(tid, thisTid); 620 } 621 622 int copy_paths(GlobalState gs, immutable string[] paths, string copy_to, bool remove) 623 { 624 foreach(tid, paths2; gs.copiers) 625 { 626 if (paths2 == paths) 627 { 628 writefln("Copy %s already in work", paths); 629 return 0; 630 } 631 } 632 633 shared LsblkInfo[string] lsblk = to!(shared LsblkInfo[string])(gs.lsblk); 634 auto tid = spawn(&start_copy_paths, lsblk, paths, copy_to, remove, thisTid); 635 gs.copiers[tid] = paths.dup(); 636 return 0; 637 } 638 639 void check_copiers(GlobalState gs) 640 { 641 // Look check_scanners 642 } 643