-
Notifications
You must be signed in to change notification settings - Fork 1
/
incremental_insert.etl
52 lines (43 loc) · 1.75 KB
/
incremental_insert.etl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env ruby
require_relative 'init'
require 'kiba/plus/job'
require 'kiba/plus/source/mysql'
require 'kiba/plus/destination/pg_bulk2'
SOURCE_URL = 'mysql://root@localhost/shopperplus'
DEST_URL = 'postgresql://hooopo@localhost:5432/crm2_dev'
pre_process do
@job_id = Kiba::Plus::Job.new(
:connect_url => DEST_URL,
:start_at => Time.now,
:job_name => "customer"
).start
end
last_pull_at = Kiba::Plus::Job.new(
:connect_url => DEST_URL,
:job_name => "customer"
).last_pull_at
source Kiba::Plus::Source::Mysql, { :connect_url => SOURCE_URL,
:query => %Q{SELECT id, email, 'hooopo' AS first_name, 'Wang' AS last_name FROM customers WHERE updated_at > '#{last_pull_at.to_s}'},
:last_pull_at => last_pull_at,
:incremental => true
}
destination Kiba::Plus::Destination::PgBulk2, { :connect_url => DEST_URL,
:table_name => "customers",
:truncate => false,
:columns => [:id, :email, :first_name, :last_name],
:incremental => true,
:unique_by => :id
}
post_process do
Kiba::Plus::Job.new(
:connect_url => DEST_URL,
:job_id => @job_id,
:job_name => "customer"
).complete
result = PG.connect(DEST_URL).query("SELECT COUNT(*) AS num FROM customers")
puts "Insert total: #{result.first['num']}"
end
# Output:
# I, [2016-05-16T01:53:36.832565 #87909] INFO -- : TRUNCATE TABLE customers;
# I, [2016-05-16T01:53:36.841770 #87909] INFO -- : COPY customers (id, email, first_name, last_name) FROM STDIN WITH DELIMITER ',' NULL '\N' CSV
# Insert total: 428972