CF_OCR.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import logging
  2. import json
  3. import os
  4. import time
  5. from google.cloud import pubsub_v1
  6. from google.cloud import vision, storage
  7. from google.protobuf import json_format
  8. publisher_client = pubsub_v1.PublisherClient()
  9. vision_client = vision.ImageAnnotatorClient()
  10. storage_client = storage.Client()
  11. project_id = os.environ['GCP_PROJECT']
  12. RESULT_TOPIC = os.environ["RESULT_TOPIC"] #e.g pdf2text
  13. def documentOCR(vision_client, gcs_source_uri, gcs_destination_uri, batch_size=20):
  14. """
  15. Args:
  16. vision_client:
  17. gcs_source_uri:
  18. gcs_destination_uri:
  19. batch_size:
  20. Returns:
  21. """
  22. doc_title = gcs_source_uri.split('/')[-1].split('.pdf')[0]
  23. # Supported mime_types are: 'application/pdf' and 'image/tiff'
  24. mime_type = 'application/pdf'
  25. # Feature in vision API
  26. feature = vision.types.Feature(
  27. type=vision.enums.Feature.Type.DOCUMENT_TEXT_DETECTION)
  28. gcs_source = vision.types.GcsSource(uri=gcs_source_uri)
  29. input_config = vision.types.InputConfig(
  30. gcs_source=gcs_source, mime_type=mime_type)
  31. gcs_destination = vision.types.GcsDestination(uri=gcs_destination_uri)
  32. output_config = vision.types.OutputConfig(
  33. gcs_destination=gcs_destination, batch_size=batch_size)
  34. async_request = vision.types.AsyncAnnotateFileRequest(
  35. features=[feature], input_config=input_config,
  36. output_config=output_config)
  37. operation = vision_client.async_batch_annotate_files(
  38. requests=[async_request])
  39. # print('Waiting for the operation to finish.')
  40. operation.result(timeout=180)
  41. logging.info('Text extraction from document {} is completed.'.format(doc_title))
  42. def readJsonResult(storage_client, bucket_name, doc_title):
  43. """
  44. Parsing the json files and extract text.
  45. Args:
  46. storage_client:
  47. bucket_name:
  48. doc_title:
  49. Returns:
  50. all_text: str - Containing all text of the document
  51. """
  52. gcs_src_prefix = 'json/' + '{}-'.format(doc_title)
  53. # List objects with the given prefix.
  54. bucket_client = storage_client.get_bucket(bucket_name)
  55. blob_list = list(bucket_client.list_blobs(prefix=gcs_src_prefix))
  56. all_text = ''
  57. for blob in blob_list:
  58. json_string = blob.download_as_string()
  59. response = json_format.Parse(
  60. json_string, vision.types.AnnotateFileResponse())
  61. # The actual response for the first page of the input file.
  62. for response in response.responses:
  63. # first_page_response = response.responses[0]
  64. text_response = response.full_text_annotation.text
  65. all_text += text_response
  66. all_text += ' '
  67. logging.info("Parsing of {} json doc was successful.".format(doc_title))
  68. return all_text
  69. def uploadBlob(storage_client, bucket_name, txt_content, destination_blob_name):
  70. """
  71. Uploads a file to the bucket.
  72. Args:
  73. storage_client:
  74. bucket_name:
  75. txt_content: str - text
  76. destination_blob_name: str - prefix
  77. Returns:
  78. """
  79. destination_blob_name = destination_blob_name.split('gs://{}/'.format(bucket_name))[-1]
  80. bucket_client = storage_client.bucket(bucket_name)
  81. blob = bucket_client.blob(destination_blob_name)
  82. blob.upload_from_string(txt_content)
  83. logging.info("Text uploaded to {}".format(destination_blob_name))
  84. def publishMsg(text, doc_title, topic_name):
  85. """
  86. Publish message with text and filename.
  87. Args:
  88. text: str - Text contained in the document
  89. doc_title: str -
  90. topic_name: str -
  91. Returns:
  92. """
  93. # Compose the message to be sent to pubsub
  94. message = {
  95. 'text': text,
  96. 'doc_title': doc_title,
  97. }
  98. # Publish message to PubSub
  99. # Note: the message_data needs to be in bytestring
  100. # Refer to the documentation:
  101. # https://googleapis.dev/python/pubsub/latest/publisher/api/client.html
  102. message_data = json.dumps(message).encode('utf-8')
  103. topic_path = publisher_client.topic_path(project_id, topic_name)
  104. # Publish method returns a future instance
  105. future = publisher_client.publish(topic_path, data=message_data)
  106. # We need to call result method to extract the message ID
  107. # Refer to the documentation:
  108. # https://googleapis.dev/python/pubsub/latest/publisher/api/futures.html#google.cloud.pubsub_v1.publisher.futures.Future
  109. message_id = future.result()
  110. logging.info("Message id: {} was published in topic: {}".format(message_id, topic_name))
  111. def processPDFFile(file, context):
  112. """
  113. This function will be triggered when a pdf file is uploaded to the GCS bucket of interest.
  114. Args:
  115. file (dict): Metadata of the changed file, provided by the triggering
  116. Cloud Storage event.
  117. context (google.cloud.functions.Context): Metadata of triggering event.
  118. Returns:
  119. None; the output is written to stdout and Stackdriver Logging
  120. """
  121. start_time = time.time()
  122. src_bucket = file.get('bucket')
  123. dest_bucket = 'covid19-repo-test'
  124. prefix_and_doc_title = file.get('name')
  125. doc_title = prefix_and_doc_title.split('/')[-1].split('.')[0]
  126. print('name is: {}'.format(prefix_and_doc_title))
  127. # Step 1: Call OCR helper function
  128. gcs_source_path = 'gs://' + src_bucket + '/' + prefix_and_doc_title
  129. print('source gcs path: {}'.format(gcs_source_path))
  130. print('=============================')
  131. json_gcs_dest_path = 'gs://' + dest_bucket + '/json/' + doc_title + '-'
  132. print('destination json path: {}'.format(json_gcs_dest_path))
  133. print('=============================')
  134. documentOCR(vision_client, gcs_source_path, json_gcs_dest_path)
  135. print("completed OCR!")
  136. print('=============================')
  137. # Step 2: Parse json file
  138. text = readJsonResult(storage_client, dest_bucket, doc_title)
  139. print("Completed json parsing!")
  140. print('=============================')
  141. # Step 3: Publish on pubsub
  142. topic_name = RESULT_TOPIC
  143. publishMsg(text, doc_title, topic_name)
  144. print("Completed pubsub messaging!")
  145. print('=============================')
  146. # Step 4: Save on GCS
  147. upload_dest_prefix = 'raw_txt/{}.txt'.format(doc_title)
  148. uploadBlob(storage_client, dest_bucket, text, upload_dest_prefix)
  149. print("Completed upload!")
  150. print('=============================')
  151. print('File {} processed.'.format(doc_title))
  152. end_time = time.time() - start_time
  153. logging.info("Completion of text_extract took: {} seconds".format(round(end_time,1)))