Defined Type: kafka::topic

Defined in:
manifests/topic.pp

Summary

This defined type handles the creation of Kafka topics.

Overview

Examples:

Basic usage

kafka::topic { 'test':
  ensure             => present,
  zookeeper          => 'localhost:2181',
  replication_factor => 1,
  partitions         => 1,
}

Parameters:

  • ensure (Optional[String[1]]) (defaults to: undef)

    Should the topic be created.

  • zookeeper (Optional[String[1]]) (defaults to: undef)

    The connection string for the ZooKeeper connection in the form host:port. Multiple hosts can be given to allow fail-over. Kafka < 3.0.0 only!

  • bootstrap_server (Optional[String[1]]) (defaults to: undef)

    The Kafka server to connect to in the form host:port. Kafka >= 2.2.0 only!

  • replication_factor (Integer) (defaults to: 1)

    The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.

  • partitions (Integer) (defaults to: 1)

    The number of partitions for the topic being created or altered. If not supplied for create, defaults to the cluster default.

  • bin_dir (String[1]) (defaults to: '/opt/kafka/bin')

    The directory where the file kafka-topics.sh is located.

  • config (Optional[Hash[String[1],String[1]]]) (defaults to: undef)

    A topic configuration override for the topic being created or altered. See the Kafka documentation for full details on the topic configs.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'manifests/topic.pp', line 37

define kafka::topic (
  Optional[String[1]] $ensure                 = undef,
  Optional[String[1]] $zookeeper              = undef,
  Optional[String[1]] $bootstrap_server       = undef,
  Integer   $replication_factor               = 1,
  Integer   $partitions                       = 1,
  String[1] $bin_dir                          = '/opt/kafka/bin',
  Optional[Hash[String[1],String[1]]] $config = undef,
) {
  $_zookeeper          = "--zookeeper ${zookeeper}"
  $_bootstrap_server   = "--bootstrap-server ${bootstrap_server}"
  $_replication_factor = "--replication-factor ${replication_factor}"
  $_partitions         = "--partitions ${partitions}"

  if !$zookeeper and !$bootstrap_server {
    fail('Either zookeeper or bootstrap_server parameter must be defined!')
  }

  if $zookeeper {
    $_connection = $_zookeeper
  } else {
    $_connection = $_bootstrap_server
  }

  if $config {
    $_config_array = $config.map |$key, $value| { "--config ${key}=${value}" }
    $_config = join($_config_array, ' ')
  } else {
    $_config = ''
  }

  if $ensure == 'present' {
    exec { "create topic ${name}":
      path    => "/usr/bin:/usr/sbin/:/bin:/sbin:${bin_dir}",
      command => "kafka-topics.sh --create ${_connection} ${_replication_factor} ${_partitions} --topic ${name} ${_config}",
      unless  => "kafka-topics.sh --list ${_connection} | grep -x ${name}",
    }
  }
}