8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'manifests/manage_connectors/connector.pp', line 8
class kafka_connect::manage_connectors::connector (
Kafka_connect::Connectors $connectors_data,
) {
assert_private()
$connectors_data.each |$connector| {
$connector_file_name = $connector[0]
$connector_name = $connector[1]['name']
$connector_config = $connector[1]['config']
$connector_full_config = {
name => $connector_name,
config => $connector_config,
}
$connector_ensure = $connector[1]['ensure'] ? {
/^(present|running|paused)$/ => 'present',
'absent' => 'absent',
default => 'present',
}
$connector_state_ensure = $connector[1]['ensure'] ? {
/^(present|running)$/ => 'RUNNING',
'paused' => 'PAUSED',
'absent' => undef,
default => undef,
}
if ($connector_ensure == 'present' and !$connector_config) {
fail("Connector config required, unless ensure is set to absent. \
\n Validation error on ${connector_name} data, please correct. \n")
}
if $kafka_connect::owner {
$_owner = $kafka_connect::owner
} else {
$_owner = $kafka_connect::user
}
file { "${kafka_connect::connector_config_dir}/${connector_file_name}" :
ensure => $connector_ensure,
owner => $_owner,
group => $kafka_connect::group,
mode => $kafka_connect::connector_config_file_mode,
content => stdlib::to_json($connector_full_config),
before => Kc_connector[$connector_name],
}
kc_connector { $connector_name :
ensure => $connector_ensure,
config_file => "${kafka_connect::connector_config_dir}/${connector_file_name}",
connector_state_ensure => $connector_state_ensure,
hostname => $kafka_connect::hostname,
port => $kafka_connect::rest_port,
enable_delete => $kafka_connect::enable_delete,
restart_on_failed_state => $kafka_connect::restart_on_failed_state,
}
}
}
|