Puppet Class: kafka_connect

Defined in:
manifests/init.pp

Summary

Main kafka_connect class.

Overview

Examples:

Basic setup.

include kafka_connect

Typical deployment with a 3 node Kafka cluster, S3 plugin, and Schema Registry config.

class { 'kafka_connect':
  config_storage_replication_factor   => 3,
  offset_storage_replication_factor   => 3,
  status_storage_replication_factor   => 3,
  bootstrap_servers                   => [ 'kafka-01:9092', 'kafka-02:9092', 'kafka-03:9092' ],
  confluent_hub_plugins               => [ 'confluentinc/kafka-connect-s3:10.5.7' ],
  value_converter_schema_registry_url => "http://schemaregistry-elb.${facts['networking']['domain']}:8081",
}

Custom logging options, with the Elasticsearch plugin.

class { 'kafka_connect':
  log4j_enable_stdout       => true,
  log4j_custom_config_lines => [ 'log4j.logger.io.confluent.connect.elasticsearch=DEBUG' ],
  confluent_hub_plugins     => [ 'confluentinc/kafka-connect-elasticsearch:latest' ],
}

Only manage connectors, not the full setup (i.e. without install/config/service classes).

class { 'kafka_connect':
  manage_connectors_only => true,
  connector_config_dir   => '/opt/kafka-connect/etc',
  rest_port              => 8084,
  enable_delete          => true,
}

Standalone mode with local Kakfa and Zookeeper services.

class { 'kafka_connect':
  config_mode                   => 'standalone',
  run_local_kafka_broker_and_zk => true,
}

Apache archive source install.

class { 'kafka_connect':
  install_source        => 'archive',
  connector_config_dir  => '/opt/kafka/config/connectors',
  user                  => 'kafka',
  group                 => 'kafka',
  service_name          => 'kafka-connect',
  manage_user_and_group => true,
  manage_confluent_repo => false,
}

Parameters:

  • manage_connectors_only (Boolean) (defaults to: false)

    Flag for including the connector management class only.

  • manage_confluent_repo (Boolean) (defaults to: true)

    Flag for including the confluent repo class.

  • manage_user_and_group (Boolean) (defaults to: false)

    Flag for managing the service user & group.

  • include_java (Boolean) (defaults to: false)

    Flag for including class java.

  • repo_ensure (Enum['present', 'absent']) (defaults to: 'present')

    Ensure value for the Confluent package repo resource.

  • repo_enabled (Boolean) (defaults to: true)

    Enabled value for the Confluent package repo resource.

  • repo_version (Pattern[/^(\d+\.\d+|\d+)$/]) (defaults to: '7.7')

    Version of the Confluent repo to configure.

  • install_source (Enum['package', 'archive']) (defaults to: 'package')

    Installation source to use, either Confluent package or Apache archive.

  • package_name (String[1]) (defaults to: 'confluent-kafka')

    Name of the main KC package to manage.

  • package_ensure (String[1]) (defaults to: '7.7.1-1')

    State of the package to ensure. Note that this may be used by more than one resource, depending on the setup.

  • manage_schema_registry_package (Boolean) (defaults to: true)

    Flag for managing the Schema Registry package (and REST Utils dependency package).

  • schema_registry_package_name (String[1]) (defaults to: 'confluent-schema-registry')

    Name of the Schema Registry package.

  • confluent_rest_utils_package_name (String[1]) (defaults to: 'confluent-rest-utils')

    Name of the Confluent REST Utils package.

  • confluent_hub_plugin_path (Stdlib::Absolutepath) (defaults to: '/usr/share/confluent-hub-components')

    Installation path for Confluent Hub plugins.

  • confluent_hub_plugins (Kafka_connect::HubPlugins) (defaults to: [])

    List of Confluent Hub plugins to install. Each should be in the format author/name:semantic-version, e.g. ‘acme/fancy-plugin:0.1.0’ Also accepts ‘latest’ in place of a specific version.

  • confluent_hub_client_package_name (String[1]) (defaults to: 'confluent-hub-client')

    Name of the Confluent Hub Client package.

  • confluent_common_package_name (String[1]) (defaults to: 'confluent-common')

    Name of the Confluent Common package.

  • archive_install_dir (Stdlib::Absolutepath) (defaults to: '/opt/kafka')

    Install directory to use for Apache archive-based setup.

  • archive_source (Stdlib::HTTPUrl) (defaults to: 'https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz')

    Download source to use for Apache archive-based setup.

  • config_mode (Enum['distributed', 'standalone']) (defaults to: 'distributed')

    Configuration mode to use for the setup.

  • kafka_heap_options (String[1]) (defaults to: '-Xms256M -Xmx2G')

    Value to set for ‘KAFKA_HEAP_OPTS’ export.

  • kc_config_dir (Stdlib::Absolutepath) (defaults to: '/etc/kafka')

    Configuration directory for KC properties files.

  • config_storage_replication_factor (Integer) (defaults to: 1)

    Config value to set for ‘config.storage.replication.factor’.

  • config_storage_topic (String[1]) (defaults to: 'connect-configs')

    Config value to set for ‘config.storage.topic’.

  • group_id (String[1]) (defaults to: 'connect-cluster')

    Config value to set for ‘group.id’.

  • bootstrap_servers (Array[String[1]]) (defaults to: ['localhost:9092'])

    Config value to set for ‘bootstrap.servers’.

  • key_converter (String[1]) (defaults to: 'org.apache.kafka.connect.json.JsonConverter')

    Config value to set for ‘key.converter’.

  • key_converter_schemas_enable (Boolean) (defaults to: true)

    Config value to set for ‘key.converter.schemas.enable’.

  • listeners (Stdlib::HTTPUrl) (defaults to: 'HTTP://:8083')

    Config value to set for ‘listeners’.

  • log4j_file_appender (Kafka_connect::LogAppender) (defaults to: 'RollingFileAppender')

    Log4j file appender type to use (RollingFileAppender or DailyRollingFileAppender).

  • log4j_appender_file_path (Stdlib::Absolutepath) (defaults to: '/var/log/confluent/connect.log')

    Config value to set for ‘log4j.appender.file.File’.

  • log4j_appender_max_file_size (String[1]) (defaults to: '100MB')

    Config value to set for ‘log4j.appender.file.MaxFileSize’. Only used if log4j_file_appender = ‘RollingFileAppender’.

  • log4j_appender_max_backup_index (Integer) (defaults to: 10)

    Config value to set for ‘log4j.appender.file.MaxBackupIndex’. Only used if log4j_file_appender = ‘RollingFileAppender’.

  • log4j_appender_date_pattern (String[1]) (defaults to: '\'.\'yyyy-MM-dd-HH')

    Config value to set for ‘log4j.appender.file.DatePattern’. Only used if log4j_file_appender = ‘DailyRollingFileAppender’.

  • log4j_enable_stdout (Boolean) (defaults to: false)

    Option to enable logging to stdout/console.

  • log4j_custom_config_lines (Optional[Array[String[1]]]) (defaults to: undef)

    Option to provide additional custom logging configuration. Can be used, for example, to adjust the log level for a specific connector type. See: docs.confluent.io/platform/current/connect/logging.html#use-the-kconnect-log4j-properties-file

  • log4j_loglevel_rootlogger (Kafka_connect::Loglevel) (defaults to: 'INFO')

    Config value to set for ‘log4j.rootLogger’.

  • offset_storage_file_filename (String[1]) (defaults to: '/tmp/connect.offsets')

    Config value to set for ‘offset.storage.file.filename’. Only used in standalone mode.

  • offset_flush_interval_ms (Integer) (defaults to: 10000)

    Config value to set for ‘offset.flush.interval.ms’.

  • offset_storage_topic (String[1]) (defaults to: 'connect-offsets')

    Config value to set for ‘offset.storage.topic’.

  • offset_storage_replication_factor (Integer) (defaults to: 1)

    Config value to set for ‘offset.storage.replication.factor’.

  • offset_storage_partitions (Integer) (defaults to: 25)

    Config value to set for ‘offset.storage.partitions’.

  • plugin_path (Stdlib::Absolutepath) (defaults to: '/usr/share/java,/usr/share/confluent-hub-components')

    Config value to set for ‘plugin.path’.

  • status_storage_topic (String[1]) (defaults to: 'connect-status')

    Config value to set for ‘status.storage.topic’.

  • status_storage_replication_factor (Integer) (defaults to: 1)

    Config value to set for ‘status.storage.replication.factor’.

  • status_storage_partitions (Integer) (defaults to: 5)

    Config value to set for ‘status.storage.partitions’.

  • value_converter (String[1]) (defaults to: 'org.apache.kafka.connect.json.JsonConverter')

    Config value to set for ‘value.converter’.

  • value_converter_schema_registry_url (Optional[Stdlib::HTTPUrl]) (defaults to: undef)

    Config value to set for ‘value.converter.schema.registry.url’, if defined.

  • value_converter_schemas_enable (Boolean) (defaults to: true)

    Config value to set for ‘value.converter.schemas.enable’.

  • manage_systemd_service_file (Boolean) (defaults to: true)

    Flag for managing systemd service unit file(s).

  • service_name (String[1]) (defaults to: 'confluent-kafka-connect')

    Name of the service to manage.

  • service_ensure (Stdlib::Ensure::Service) (defaults to: 'running')

    State of the service to ensure.

  • service_enable (Boolean) (defaults to: true)

    Value for enabling the service at boot.

  • service_provider (Optional[String[1]]) (defaults to: undef)

    Backend provider to use for the service resource.

  • run_local_kafka_broker_and_zk (Boolean) (defaults to: false)

    Flag for running local kafka broker and zookeeper services. Intended only for use with standalone config mode.

  • user (Variant[String[1], Integer]) (defaults to: 'cp-kafka-connect')

    User to run service as, set owner on config files, etc.

  • group (Variant[String[1], Integer]) (defaults to: 'confluent')

    Group the service will run as.

  • user_and_group_ensure (Enum['present', 'absent']) (defaults to: 'present')

    Value to set for ensure on user & group, if managed.

  • owner (Optional[ Variant[String[1], Integer] ]) (defaults to: undef)

    Owner to set on config files. Deprecated: use the ‘user’ parameter instead.

  • connector_config_dir (Stdlib::Absolutepath) (defaults to: '/etc/kafka-connect')

    Configuration directory for connector properties files.

  • connector_config_file_mode (Stdlib::Filemode) (defaults to: '0640')

    Mode to set on connector config file.

  • connector_secret_file_mode (Stdlib::Filemode) (defaults to: '0600')

    Mode to set on connector secret file.

  • hostname (String[1]) (defaults to: 'localhost')

    The hostname or IP of the KC service.

  • rest_port (Stdlib::Port) (defaults to: 8083)

    Port to connect to for the REST API.

  • enable_delete (Boolean) (defaults to: false)

    Enable delete of running connectors. Required for the provider to actually remove when set to absent.

  • restart_on_failed_state (Boolean) (defaults to: false)

    Allow the provider to auto restart on FAILED connector state.

Author:



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'manifests/init.pp', line 260

class kafka_connect (
  # kafka_connect
  Boolean                           $manage_connectors_only              = false,
  Boolean                           $manage_confluent_repo               = true,
  Boolean                           $manage_user_and_group               = false,
  Boolean                           $include_java                        = false,

  # kafka_connect::confluent_repo
  Enum['present', 'absent']         $repo_ensure                         = 'present',
  Boolean                           $repo_enabled                        = true,
  Pattern[/^(\d+\.\d+|\d+)$/]       $repo_version                        = '7.7',

  # kafka_connect::install
  Enum['package', 'archive']        $install_source                      = 'package',
  String[1]                         $package_name                        = 'confluent-kafka',
  String[1]                         $package_ensure                      = '7.7.1-1',
  Boolean                           $manage_schema_registry_package      = true,
  String[1]                         $schema_registry_package_name        = 'confluent-schema-registry',
  String[1]                         $confluent_rest_utils_package_name   = 'confluent-rest-utils',
  Stdlib::Absolutepath              $confluent_hub_plugin_path           = '/usr/share/confluent-hub-components',
  Kafka_connect::HubPlugins         $confluent_hub_plugins               = [],
  String[1]                         $confluent_hub_client_package_name   = 'confluent-hub-client',
  String[1]                         $confluent_common_package_name       = 'confluent-common',
  Stdlib::Absolutepath              $archive_install_dir                 = '/opt/kafka',
  Stdlib::HTTPUrl                   $archive_source                      = 'https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz',

  # kafka_connect::config
  Enum['distributed', 'standalone'] $config_mode                         = 'distributed',
  String[1]                         $kafka_heap_options                  = '-Xms256M -Xmx2G',
  Stdlib::Absolutepath              $kc_config_dir                       = '/etc/kafka',
  Integer                           $config_storage_replication_factor   = 1,
  String[1]                         $config_storage_topic                = 'connect-configs',
  String[1]                         $group_id                            = 'connect-cluster',
  Array[String[1]]                  $bootstrap_servers                   = ['localhost:9092'],
  String[1]                         $key_converter                       = 'org.apache.kafka.connect.json.JsonConverter',
  Boolean                           $key_converter_schemas_enable        = true,
  Stdlib::HTTPUrl                   $listeners                           = 'HTTP://:8083',
  Kafka_connect::LogAppender        $log4j_file_appender                 = 'RollingFileAppender',
  Stdlib::Absolutepath              $log4j_appender_file_path            = '/var/log/confluent/connect.log',
  String[1]                         $log4j_appender_max_file_size        = '100MB',
  Integer                           $log4j_appender_max_backup_index     = 10,
  String[1]                         $log4j_appender_date_pattern         = '\'.\'yyyy-MM-dd-HH',
  Boolean                           $log4j_enable_stdout                 = false,
  Optional[Array[String[1]]]        $log4j_custom_config_lines           = undef,
  Kafka_connect::Loglevel           $log4j_loglevel_rootlogger           = 'INFO',
  String[1]                         $offset_storage_file_filename        = '/tmp/connect.offsets',
  Integer                           $offset_flush_interval_ms            = 10000,
  String[1]                         $offset_storage_topic                = 'connect-offsets',
  Integer                           $offset_storage_replication_factor   = 1,
  Integer                           $offset_storage_partitions           = 25,
  Stdlib::Absolutepath              $plugin_path                         = '/usr/share/java,/usr/share/confluent-hub-components',
  String[1]                         $status_storage_topic                = 'connect-status',
  Integer                           $status_storage_replication_factor   = 1,
  Integer                           $status_storage_partitions           = 5,
  String[1]                         $value_converter                     = 'org.apache.kafka.connect.json.JsonConverter',
  Optional[Stdlib::HTTPUrl]         $value_converter_schema_registry_url = undef,
  Boolean                           $value_converter_schemas_enable      = true,
  Boolean                           $manage_systemd_service_file         = true,

  # kafka_connect::service
  String[1]                         $service_name                        = 'confluent-kafka-connect',
  Stdlib::Ensure::Service           $service_ensure                      = 'running',
  Boolean                           $service_enable                      = true,
  Optional[String[1]]               $service_provider                    = undef,
  Boolean                           $run_local_kafka_broker_and_zk       = false,

  # kafka_connect::user
  Variant[String[1], Integer]       $user                                = 'cp-kafka-connect',
  Variant[String[1], Integer]       $group                               = 'confluent',
  Enum['present', 'absent']         $user_and_group_ensure               = 'present',
  Optional[
    Variant[String[1], Integer]
  ]                                 $owner                               = undef, # deprecated, use $user

  # kafka_connect::manage_connectors
  Stdlib::Absolutepath              $connector_config_dir                = '/etc/kafka-connect',
  Stdlib::Filemode                  $connector_config_file_mode          = '0640',
  Stdlib::Filemode                  $connector_secret_file_mode          = '0600',
  String[1]                         $hostname                            = 'localhost',
  Stdlib::Port                      $rest_port                           = 8083,
  Boolean                           $enable_delete                       = false,
  Boolean                           $restart_on_failed_state             = false,
) {
  if $include_java {
    include 'java'
  }

  if $owner {
    deprecation('kafka_connect::owner',
    'The $owner parameter is deprecated, please use $user instead.')
  }

  if $manage_connectors_only {
    contain kafka_connect::manage_connectors
  } else {
    if $manage_confluent_repo {
      contain kafka_connect::confluent_repo

      Class['kafka_connect::confluent_repo']
      -> Class['kafka_connect::install']
    }

    if $manage_user_and_group {
      contain kafka_connect::user

      Class['kafka_connect::user']
      -> Class['kafka_connect::install']
    }

    contain kafka_connect::install
    contain kafka_connect::config
    contain kafka_connect::service
    contain kafka_connect::manage_connectors

    Class['kafka_connect::install']
    -> Class['kafka_connect::config']
    ~> Class['kafka_connect::service']
    -> Class['kafka_connect::manage_connectors']
  }
}