37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'manifests/topic.pp', line 37
define kafka::topic (
Optional[String[1]] $ensure = undef,
Optional[String[1]] $zookeeper = undef,
Optional[String[1]] $bootstrap_server = undef,
Integer $replication_factor = 1,
Integer $partitions = 1,
String[1] $bin_dir = '/opt/kafka/bin',
Optional[Hash[String[1],String[1]]] $config = undef,
) {
$_zookeeper = "--zookeeper ${zookeeper}"
$_bootstrap_server = "--bootstrap-server ${bootstrap_server}"
$_replication_factor = "--replication-factor ${replication_factor}"
$_partitions = "--partitions ${partitions}"
if !$zookeeper and !$bootstrap_server {
fail('Either zookeeper or bootstrap_server parameter must be defined!')
}
if $zookeeper {
$_connection = $_zookeeper
} else {
$_connection = $_bootstrap_server
}
if $config {
$_config_array = $config.map |$key, $value| { "--config ${key}=${value}" }
$_config = join($_config_array, ' ')
} else {
$_config = ''
}
if $ensure == 'present' {
exec { "create topic ${name}":
path => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}",
command => "kafka-topics.sh --create ${_connection} ${_replication_factor} ${_partitions} --topic ${name} ${_config}",
unless => "kafka-topics.sh --list ${_connection} | grep -x ${name}",
}
}
}
|