作者小头像 Lv.1
0 成长值

个人介绍

这个人很懒,什么都没有留下

感兴趣或擅长的领域

暂无数据
个人勋章
TA还没获得勋章~
成长雷达
0
0
0
0
0

个人资料

个人介绍

这个人很懒,什么都没有留下

感兴趣或擅长的领域

暂无数据

达成规则

他的回复:
public class LoginUtil { private static final Logger LOG = LoggerFactory.getLogger(LoginUtil.class); /** * no JavaDoc */ public enum Module { STORM("StormClient"), KAFKA("KafkaClient"), ZOOKEEPER("Client"); private String name; private Module(String name) { this.name = name; } public String getName() { return name; } } /** * line operator string */ private static final String LINE_SEPARATOR = System.getProperty("line.separator"); /** * jaas file postfix */ private static final String JAAS_POSTFIX = ".jaas.conf"; /** * is IBM jdk or not */ private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM"); /** * IBM jdk login module */ private static final String IBM_LOGIN_MODULE = "com.ibm.security.auth.module.Krb5LoginModule required"; /** * oracle jdk login module */ private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; /** * Zookeeper quorum principal. */ public static final String ZOOKEEPER_AUTH_PRINCIPAL = "zookeeper.server.principal"; /** * java security krb5 file path */ public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; /** * java security login file path */ public static final String JAVA_SECURITY_LOGIN_CONF = "java.security.auth.login.config"; /** * 设置jaas.conf文件 * * @param principal * @param keytabPath * @throws IOException */ public static void setJaasFile(String principal, String keytabPath) throws IOException { String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX; // windows路径下分隔符替换 jaasPath = jaasPath.replace("\\", "\\\\"); // 删除jaas文件 deleteJaasFile(jaasPath); writeJaasFile(jaasPath, principal, keytabPath); System.out.println(JAVA_SECURITY_LOGIN_CONF+jaasPath); System.setProperty(JAVA_SECURITY_LOGIN_CONF, jaasPath); System.out.println("--------------------------------------"); System.out.println(jaasPath); } /** * 设置zookeeper服务端principal * * @param zkServerPrincipal * @throws IOException */ public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException { System.setProperty(ZOOKEEPER_AUTH_PRINCIPAL, zkServerPrincipal); String ret = System.getProperty(ZOOKEEPER_AUTH_PRINCIPAL); if (ret == null) { throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is null."); } if (!ret.equals(zkServerPrincipal)) { throw new IOException(ZOOKEEPER_AUTH_PRINCIPAL + " is " + ret + " is not " + zkServerPrincipal + "."); } } /** * 设置krb5文件 * * @param krb5ConfFile * @throws IOException */ public static void setKrb5Config(String krb5ConfFile) throws IOException { System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5ConfFile); String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF); if (ret == null) { throw new IOException(JAVA_SECURITY_KRB5_CONF + " is null."); } if (!ret.equals(krb5ConfFile)) { throw new IOException(JAVA_SECURITY_KRB5_CONF + " is " + ret + " is not " + krb5ConfFile + "."); } } /** * 写入jaas文件 * * @throws IOException * 写文件异常 */ private static void writeJaasFile(String jaasPath, String principal, String keytabPath) throws IOException { FileWriter writer = new FileWriter(new File(jaasPath)); try { writer.write(getJaasConfContext(principal, keytabPath)); writer.flush(); } catch (IOException e) { throw new IOException("Failed to create jaas.conf File"); } finally { writer.close(); } } private static void deleteJaasFile(String jaasPath) throws IOException { File jaasFile = new File(jaasPath); if (jaasFile.exists()) { if (!jaasFile.delete()) { throw new IOException("Failed to delete exists jaas file."); } } } private static String getJaasConfContext(String principal, String keytabPath) { Module[] allModule = Module.values(); StringBuilder builder = new StringBuilder(); for (Module modlue : allModule) { builder.append(getModuleContext(principal, keytabPath, modlue)); } return builder.toString(); } private static String getModuleContext(String userPrincipal, String keyTabPath, Module module) { StringBuilder builder = new StringBuilder(); if (IS_IBM_JDK) { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(IBM_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("credsType=both").append(LINE_SEPARATOR); builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); builder.append("useKeytab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR); } else { builder.append(module.getName()).append(" {").append(LINE_SEPARATOR); builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR); builder.append("useKeyTab=true").append(LINE_SEPARATOR); builder.append("keyTab=\"" + keyTabPath + "\"").append(LINE_SEPARATOR); builder.append("principal=\"" + userPrincipal + "\"").append(LINE_SEPARATOR); builder.append("useTicketCache=false").append(LINE_SEPARATOR); builder.append("storeKey=true").append(LINE_SEPARATOR); builder.append("debug=true;").append(LINE_SEPARATOR); builder.append("};").append(LINE_SEPARATOR); } return builder.toString(); } public static void securityPrepare(String principal, String keyTabFile) throws IOException { String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator; String krbFile = filePath + "krb5.conf"; String userKeyTableFile = filePath + keyTabFile; // windows路径下分隔符替换 userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\"); LoginUtil.setKrb5Config(krbFile); LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); LoginUtil.setJaasFile(principal, userKeyTableFile); } /** * Check security mode * * @return boolean */ public static Boolean isSecurityModel() { Boolean isSecurity = false; String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "kafkaSecurityMode"; Properties securityProps = new Properties(); // file does not exist. if (!isFileExists(krbFilePath)) { return isSecurity; } try { securityProps.load(new FileInputStream(krbFilePath)); if ("yes".equalsIgnoreCase(securityProps.getProperty("kafka.client.security.mode"))) { isSecurity = true; } } catch (Exception e) { LOG.info("The Exception occured : {}.", e); } return isSecurity; } /* * 判断文件是否存在 */ private static boolean isFileExists(String fileName) { File file = new File(fileName); return file.exists(); } }
他的回复:
public class Producer extends Thread { private static final Logger LOG = LoggerFactory.getLogger(Producer.class); private final KafkaProducer producer; private final String topic; private final Boolean isAsync; // Broker地址列表 private final static String BOOTSTRAP_SERVER = "bootstrap.servers"; // 客户端ID private final static String CLIENT_ID = "client.id"; // Key序列化类 private final static String KEY_SERIALIZER = "key.serializer"; // Value序列化类 private final static String VALUE_SERIALIZER = "value.serializer"; // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT private final static String SECURITY_PROTOCOL = "security.protocol"; // 服务名 private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; // 域名 private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name"; // 分区类名 private final static String PARTITIONER_NAME = "partitioner.class"; /** * 用户自己申请的机机账号keytab文件名称 */ private static final String USER_KEYTAB_FILE = "user.keytab"; /** * 用户自己申请的机机账号名称 */ private static final String USER_PRINCIPAL = "xxxxxxx"; /** * Producer constructor * * @param topicName Topic名称 * @param asyncEnable 是否异步模式发送 */ public Producer(String topicName, Boolean asyncEnable) { Properties props = initProperties(); producer = new KafkaProducer(props); topic = topicName; isAsync = asyncEnable; } public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); return props; } private static List messList; /** * 生产者线程执行函数,循环发送消息。 */ public void run() { LOG.info("New Producer: start."); int keyNum = 1; int messageNo = 20; while (keyNum = messageNo) { String messageStr = "message : " + keyNum; String key = String.valueOf(keyNum); ProducerRecord record = new ProducerRecord("GSWL_TEST", key, messageStr); try { // 同步发送 producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } keyNum++; } } public static void main(String[] args) { if (LoginUtil.isSecurityModel()) { try { LOG.info("Securitymode start."); // !!注意,安全认证时,需要用户手动修改为自己申请的机机账号 LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure."); LOG.error("The IOException occured.", e); return; } LOG.info("Security prepare success."); } // 是否使用异步发送模式 final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); producerThread.start(); System.out.println("Message send job finished..................."); } static class DemoCallBack implements Callback { private final Logger logger = LoggerFactory.getLogger(DemoCallBack.class); private long startTime; private int key; private String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 回调函数,用于处理异步发送模式下,消息发送到服务端后的处理。 * * @param metadata 元数据信息 * @param exception 发送异常。如果没有错误发生则为Null。 */ @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { logger.info("message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else if (exception != null) { logger.error("The Exception occured.", exception); } } } }