Puppet Class: kafka_connect::manage_connectors::connector

Defined in:
manifests/manage_connectors/connector.pp

Summary

Manages individual Kafka Connect connectors.

Overview

KC connector class.

Parameters:

  • connectors_data (Kafka_connect::Connectors)

    Hash of connector names and their corresponding data.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'manifests/manage_connectors/connector.pp', line 10

class kafka_connect::manage_connectors::connector (
  Kafka_connect::Connectors $connectors_data,
) {
  assert_private()

  $connectors_data.each |$connector| {
    $connector_file_name = $connector[0]
    $connector_name      = $connector[1]['name']
    $connector_config    = $connector[1]['config']

    $connector_full_config = {
      name   => $connector_name,
      config => $connector_config,
    }

    $connector_ensure = $connector[1]['ensure'] ? {
      /^(present|running|paused)$/ => 'present',
      'absent'                     => 'absent',
      default                      => 'present',
    }

    $connector_state_ensure = $connector[1]['ensure'] ? {
      /^(present|running)$/ => 'RUNNING',
      'paused'              => 'PAUSED',
      'absent'              => undef,
      default               => undef,
    }

    if ($connector_ensure == 'present' and !$connector_config) {
      fail("Connector config required, unless ensure is set to absent. \
        \n Validation error on ${connector_name} data, please correct. \n")
    }

    if $kafka_connect::owner {
      $_owner = $kafka_connect::owner
    } else {
      $_owner = $kafka_connect::user
    }

    file { "${kafka_connect::connector_config_dir}/${connector_file_name}" :
      ensure  => $connector_ensure,
      owner   => $_owner,
      group   => $kafka_connect::group,
      mode    => $kafka_connect::connector_config_file_mode,
      content => stdlib::to_json($connector_full_config),
      before  => Kc_connector[$connector_name],
    }

    kc_connector { $connector_name :
      ensure                  => $connector_ensure,
      config_file             => "${kafka_connect::connector_config_dir}/${connector_file_name}",
      connector_state_ensure  => $connector_state_ensure,
      hostname                => $kafka_connect::hostname,
      port                    => $kafka_connect::rest_port,
      enable_delete           => $kafka_connect::enable_delete,
      restart_on_failed_state => $kafka_connect::restart_on_failed_state,
    }
  }
}