Skip to main content

대용량 데이터 처리를 위한 기술 활용 (Kafka, ES)

대용량 데이터 처리 및 Kafka + Bulk 활용

데이터 처리 흐름

  1. DB -> Kafka -> ElasticSearch 순서로 데이터가 전달됨

  2. Kafka를 bucket 단위로 나누어 데이터 전송하여 효율적인 처리

  3. ElasticSearch에는 Bulk API를 활용하여 대량의 데이터를 일괄 삽입

주요 개선 사항

  1. Kafka를 Bucket 단위로 나눠서 전송 → 데이터 스트리밍 성능 최적화

  2. ElasticSearch에 Bulk 처리 적용 → 인덱싱 성능 향상 및 네트워크 부하 감소

  3. 대용량 데이터(150,000개 이상)를 효율적으로 처리하여 빠른 데이터 동기화 가능

Kafka Bucket 처리 예시

qryCnt := models.QryCount(y.Db, cntStr)
	fmt.Println("qryCnt:", qryCnt)

	bucketCnt := 0
	if qryCnt == 0 {
		bucketCnt = 0
	} else {
		bucketCnt = 1 + (int(qryCnt) / p.Limit)
	}

    fmt.Println("bucketCnt:", bucketCnt)
	for i := 0; i < bucketCnt; i++ {
		// for i := 0; i < 3; i++ {
		p.Offset = p.Limit * i
		sqlStr, _, _ := models_join.ListType1PlainQryStrGet(y, lt, p, q, "/list")
		if err := t.ProduceaBucketToKafka(y, "kkk", headers, sqlStr, topic); err != nil {
			return errors.New(e.PageQryErr("vrt452", ": "+err.Error()))
		}
	}

👉 Bucket을 활용하여 병렬 처리 가능!

ElasticSearch Bulk API 예시

	for _, row := range v.Page {
		row.ClientCode = clientCode
		fmt.Printf("BuyerId: %d, LastSorderDate: %s, ClientCode : %s\n", row.Id, row.LastSorderDate, row.ClientCode)
		row.LastVistDate = row.LastSorderDate
		// Meta 데이터 작성
		meta := fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }%s`, esIndex, row.Id, "\n")
		bulkRequest.WriteString(meta)

		// 문서 데이터 작성
		data, _ := json.Marshal(row)
		bulkRequest.WriteString(string(data) + "\n")
		fmt.Println(string(data)) // 데이터를 출력하여 검증

		fmt.Printf("Prepared document for BuyerId: %d\n", row.Id)
	}

	// Bulk 요청 전송
	res, err := es.Bulk(bytes.NewReader(bulkRequest.Bytes()))
	if err != nil {
		return fmt.Errorf("bulk request failed: %w", err)
	}
	defer res.Body.Close()

	// Elasticsearch 응답 확인
	if res.IsError() {
		log.Printf("Bulk indexing failed: %s", res.String())
		return fmt.Errorf("bulk indexing failed")
	}

👉 Bulk API를 활용하면 한 번에 많은 데이터를 삽입 가능!

이 방식은 대량의 데이터를 효율적으로 처리하고 성능을 최적화할 수 있도록 설계됨.