Puppet Class: kafka_connect::manage_connectors::connector
- Defined in:
- manifests/manage_connectors/connector.pp
Summary
Manages individual Kafka Connect connectors.Overview
KC connector class.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'manifests/manage_connectors/connector.pp', line 10
class kafka_connect::manage_connectors::connector (
Kafka_connect::Connectors $connectors_data,
) {
assert_private()
$stdlib_version = load_module_metadata('stdlib')['version']
$connectors_data.each |$connector| {
$connector_file_name = $connector[0]
$connector_name = $connector[1]['name']
$connector_config = $connector[1]['config']
$connector_full_config = {
name => $connector_name,
config => $connector_config,
}
$connector_full_config_content = if versioncmp($stdlib_version, '9.0.0') < 0 {
to_json($connector_full_config)
} else {
stdlib::to_json($connector_full_config)
}
$connector_ensure = $connector[1]['ensure'] ? {
/^(present|running|paused)$/ => 'present',
'absent' => 'absent',
default => 'present',
}
$connector_state_ensure = $connector[1]['ensure'] ? {
/^(present|running)$/ => 'RUNNING',
'paused' => 'PAUSED',
'absent' => undef,
default => undef,
}
if ($connector_ensure == 'present' and !$connector_config) {
fail("Connector config required, unless ensure is set to absent. \
\n Validation error on ${connector_name} data, please correct. \n")
}
if $kafka_connect::owner {
$_owner = $kafka_connect::owner
} else {
$_owner = $kafka_connect::user
}
file { "${kafka_connect::connector_config_dir}/${connector_file_name}" :
ensure => $connector_ensure,
owner => $_owner,
group => $kafka_connect::group,
mode => $kafka_connect::connector_config_file_mode,
content => $connector_full_config_content,
before => Kc_connector[$connector_name],
}
kc_connector { $connector_name :
ensure => $connector_ensure,
config_file => "${kafka_connect::connector_config_dir}/${connector_file_name}",
connector_state_ensure => $connector_state_ensure,
hostname => $kafka_connect::hostname,
port => $kafka_connect::rest_port,
enable_delete => $kafka_connect::enable_delete,
restart_on_failed_state => $kafka_connect::restart_on_failed_state,
}
}
}
|