AwaitMessageTriggerFunctionSensor Issue #34776
Unanswered
Venkatesh748
asked this question in
Q&A
Replies: 1 comment
-
@Venkatesh748 did you solve the problem? |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi All,
When I am using 'AwaitMessageTriggerFunctionSensor' in airflow I am facing below error
Error Message : cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Failed to create consumer: ssl.ca.location failed: error:05880002:x509 certificate routines::system lib"} (log is attached)
I have declared all the required certifications related paths in kafka connection. Below is the configuration details used in airflow kafka connection.
{"bootstrap.servers": "kaas-test-ctc-a.optum.com:443", "group.id": "hcc-dataplatform-kafka-consumer-group-v1", "security.protocol": "ssl", "auto.offset.reset": "earliest", "enable.ssl.certificate.verification": "true", "ssl.key.location": "/air_rxf/dags/kafka_conf/key.pem", "ssl.key.password": "xxxxxxx", "ssl.certificate.location": "/air_rxf/dags/kafka_conf/cert.pem", "ssl.ca.location": "/air_rxf/dags/kafka_conf/ca.crt", "ssl.keystore.location": "/air_rxf/dags/kafka_conf/keystore.p12", "ssl.keystore.password": "xxxxxxx"}
But I am able to consume kafka messages using "ConsumeFromTopicOperator" with same kafka connection.
Below are the versions
Airflow Version : [v2.5.0]
Python version : Python 3.8.6 (default, Feb 26 2021, 11:24:21)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Please let me know how to fix this issue.
dag_id=dag_rxf_rxf_listen_the_topic_run_id=scheduled__2023-10-03T10_30_00+00_00_task_id=dag_rxf_rxf_listen_for_message_attempt=1.log
Beta Was this translation helpful? Give feedback.
All reactions