Puppet Class: kafka_connect::manage_connectors::connector

Defined in:
manifests/manage_connectors/connector.pp

Summary

Class to manage individual Kafka Connect connectors.

Overview

Parameters:

  • connectors_data (Kafka_connect::Connectors)

    Hash of connector names and their corresponding data.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'manifests/manage_connectors/connector.pp', line 8

class kafka_connect::manage_connectors::connector (
  Kafka_connect::Connectors $connectors_data,
) {
  assert_private()

  $connectors_data.each |$connector| {
    $connector_file_name = $connector[0]
    $connector_name      = $connector[1]['name']
    $connector_config    = $connector[1]['config']

    $connector_full_config = {
      name   => $connector_name,
      config => $connector_config,
    }

    $connector_ensure = $connector[1]['ensure'] ? {
      /^(present|running|paused)$/ => 'present',
      'absent'                     => 'absent',
      default                      => 'present',
    }

    $connector_state_ensure = $connector[1]['ensure'] ? {
      /^(present|running)$/ => 'RUNNING',
      'paused'              => 'PAUSED',
      'absent'              => undef,
      default               => undef,
    }

    if ($connector_ensure == 'present' and !$connector_config) {
      fail("Connector config required, unless ensure is set to absent. \
        \n Validation error on ${connector_name} data, please correct. \n")
    }

    if $kafka_connect::owner {
      $_owner = $kafka_connect::owner
    } else {
      $_owner = $kafka_connect::user
    }

    file { "${kafka_connect::connector_config_dir}/${connector_file_name}" :
      ensure  => $connector_ensure,
      owner   => $_owner,
      group   => $kafka_connect::group,
      mode    => $kafka_connect::connector_config_file_mode,
      content => stdlib::to_json($connector_full_config),
      before  => Kc_connector[$connector_name],
    }

    kc_connector { $connector_name :
      ensure                  => $connector_ensure,
      config_file             => "${kafka_connect::connector_config_dir}/${connector_file_name}",
      connector_state_ensure  => $connector_state_ensure,
      hostname                => $kafka_connect::hostname,
      port                    => $kafka_connect::rest_port,
      enable_delete           => $kafka_connect::enable_delete,
      restart_on_failed_state => $kafka_connect::restart_on_failed_state,
    }
  }
}